diff --git a/main.py b/main.py index c077f38..f0ac378 100644 --- a/main.py +++ b/main.py @@ -1,635 +1,664 @@ -import asyncio -import base64 -import codecs -import json -import re -import logging -import datetime -from typing import Dict, List, Optional -from contextlib import asynccontextmanager - -import httpx -from fastapi import FastAPI, Query, HTTPException -from bs4 import BeautifulSoup -from fastapi.responses import JSONResponse - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger("HDHub-Resolver") - -# ========================================== -# CONFIGURATION -# ========================================== -HDHUB_BASE_URL = "https://4khdhub.dad" - - -def safe_b64decode(data: str) -> str: - if not data: - return "" - try: - data = data.strip() - missing = len(data) % 4 - if missing: - data += "=" * (4 - missing) - return base64.b64decode(data, validate=False).decode("utf-8", errors="ignore") - except Exception: - return "" - - -class HDHubEngine: - def __init__(self): - self.client = httpx.AsyncClient( - timeout=httpx.Timeout(30.0, connect=15.0), - follow_redirects=True, - headers={ - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " - "(KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36", - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", - }, - ) - - # ========================================== - # UTILS - # ========================================== - def _convert_pixeldrain(self, url: str) -> str: - """Converts normal Pixeldrain viewer links to direct download API links.""" - match = re.search( - r"(https?://pixeldrain\.[a-z]+)/u/([a-zA-Z0-9_-]+)", url, re.IGNORECASE - ) - if match: - base_url = match.group(1) - file_id = match.group(2) - return f"{base_url}/api/file/{file_id}?download" - return url - - # ========================================== - # RESOLVER MODULE - # ========================================== - async def resolve(self, url: str, depth: int = 0) -> Dict: - if depth > 12: - return {"error": "Max recursion depth reached", "url": url} - - logger.info(f"Resolving (depth {depth}): {url}") - - if "pixeldrain" in url.lower(): - return { - "found_links": [ - { - "server": "Pixeldrain Direct", - "link": self._convert_pixeldrain(url), - } - ], - "count": 1, - "type": "direct", - } - - try: - resp = await self.client.get(url) - text = resp.text - final_url = str(resp.url) - - if any( - ext in final_url.lower() - for ext in [".mkv", ".mp4", ".m3u8", ".zip", "pixeldrain"] - ): - return { - "found_links": [ - { - "server": "Direct Link", - "link": self._convert_pixeldrain(final_url), - } - ], - "count": 1, - "type": "direct", - } - - if any( - k in final_url.lower() - for k in ["gadgetsweb", "cryptoinsights", "techly360", "?id="] - ): - return await self._handle_wrapper(final_url, text, depth) - - if any( - k in final_url.lower() - for k in ["hubcloud", "gamerxyt", "shikshakdaak", "drive/"] - ): - return await self._handle_hubcloud_style_page(final_url, text, depth) - - return await self._fallback_search(text, depth) - - except httpx.HTTPStatusError as e: - return {"error": f"HTTP {e.response.status_code}", "url": url} - except Exception as e: - logger.exception("Resolution error") - return {"error": str(e), "url": url} - - async def _handle_hubcloud_style_page( - self, page_url: str, text: str, depth: int - ) -> Dict: - soup = BeautifulSoup(text, "html.parser") - links_data = [] - seen = set() - - current_minute = str(datetime.datetime.now().minute).zfill(2) - base_to_ignore = set() - - s3 = soup.find("a", id="s3") - if s3 and s3.get("href"): - base_s3 = s3["href"].strip() - base_to_ignore.add(base_s3) - dynamic_s3 = base_s3 + "_1" + current_minute - seen.add(dynamic_s3) - links_data.append( - { - "server": s3.text.strip().replace("Download", "").strip() - or "FSLv2 Server", - "link": dynamic_s3, - } - ) - - fsl = soup.find("a", id="fsl") - if fsl and fsl.get("href"): - base_fsl = fsl["href"].strip() - base_to_ignore.add(base_fsl) - dynamic_fsl = base_fsl + "1" + current_minute - seen.add(dynamic_fsl) - links_data.append( - { - "server": fsl.text.strip().replace("Download", "").strip() - or "FSL Server", - "link": dynamic_fsl, - } - ) - - junk_keywords = [ - "hubcloud.fans", - "drive/admin", - "t.me", - "tinyurl.com", - "one.one.one.one", - "google.com/search", - "ampproject.org", - "bloggingvector.shop", - ] - - for a in soup.find_all("a", href=True): - href = a["href"].strip() - if not href or href.startswith(("#", "javascript:")): - continue - if href.startswith("/"): - href = f"https://{httpx.URL(page_url).host}{href}" - if href in base_to_ignore or any(j in href.lower() for j in junk_keywords): - continue - - if "gamerxyt.com/hubcloud.php" in href.lower() and "host=" in href.lower(): - return await self.resolve(href, depth + 1) - - if href not in seen: - seen.add(href) - label = a.text.strip().replace("Download", "").strip() - links_data.append({"server": label or "Unknown Server", "link": href}) - - download_links = [] - for item in links_data: - lower_link = item["link"].lower() - if any( - x in lower_link - for x in [ - ".mkv", - ".mp4", - ".zip", - "pixeldrain", - "fsl-buckets", - "toxix.buzz", - "hubcdn.fans", - "cloudserver", - ] - ): - item["link"] = self._convert_pixeldrain(item["link"]) - download_links.append(item) - - if download_links: - return { - "found_links": download_links, - "count": len(download_links), - "source": "hubcloud_gamerxyt", - "page_url": page_url, - } - - return {"detail": "No usable download links found", "page_url": page_url} - - async def _handle_wrapper(self, url: str, text: str, depth: int) -> Dict: - direct_match = re.search( - r'(https?://(?:www\.)?gamerxyt\.com/hubcloud\.php\?[^"\'>\s]+)', text - ) - if direct_match: - return await self.resolve(direct_match.group(1), depth + 1) - - for b64 in re.findall(r"[A-Za-z0-9+/=]{40,}", text): - dec = safe_b64decode(b64) - if "gamerxyt.com" in dec or "hubcloud.php" in dec: - m = re.search(r'(https?://[^"\'>\s]+)', dec) - if m: - return await self.resolve(m.group(1), depth + 1) - - patterns = [ - r"s\('o','([A-Za-z0-9+/=]+)'", - r"ck\('_wp_http[^']*','([^']+)'", - r"['\"]([A-Za-z0-9+/=]{100,})['\"]", - ] - combined = "".join(["".join(re.findall(pat, text)) for pat in patterns]) - - if combined: - step1 = safe_b64decode(combined) - step2 = safe_b64decode(step1) - rotated = codecs.encode(step2, "rot_13") if step2 else step2 - final_str = safe_b64decode(rotated) - try: - if final_str.strip().startswith("{"): - data = json.loads(final_str) - next_url = data.get("o") or data.get("url") or data.get("link") - if next_url: - decoded = safe_b64decode(next_url) - final = decoded if decoded.startswith("http") else next_url - return await self.resolve(final, depth + 1) - except Exception: - pass - - return {"error": "Wrapper failed to extract link", "url": url} - - async def _fallback_search(self, text: str, depth: int) -> Dict: - for b64 in re.findall(r"[A-Za-z0-9+/=]{100,}", text): - dec = safe_b64decode(b64) - if dec.startswith(("http://", "https://")): - return await self.resolve(dec, depth + 1) - return {"error": "Unsupported page", "detail": "No recognizable pattern"} - - # ========================================== - # SCRAPING MODULE (Home, Info, Search) - # ========================================== - async def get_home(self, cat_type: str = "latest_movie") -> Dict: - cat_type = cat_type.lower().strip() - if cat_type in ["movies", "movie"]: - cat_type = "movie" - elif cat_type in ["lates_movie", "latest_movies", "latest"]: - cat_type = "latest_movie" - - paths = { - "movie": "/category/movies/", - "anime": "/category/anime/", - "ott": "/category/web-series/", - "latest_movie": "/", - } - target_path = paths.get(cat_type, "/") - url = f"{HDHUB_BASE_URL.rstrip('/')}{target_path}" - - try: - resp = await self.client.get(url) - if resp.status_code == 404 and cat_type != "latest_movie": - fallback_paths = { - "movie": "/movies/", - "anime": "/anime/", - "ott": "/web-series/", - } - url = f"{HDHUB_BASE_URL.rstrip('/')}{fallback_paths.get(cat_type, target_path)}" - resp = await self.client.get(url) - - return self._parse_movie_cards(resp.text, url, category=cat_type) - except Exception as e: - return {"error": str(e)} - - async def search(self, query: str) -> Dict: - url = f"{HDHUB_BASE_URL.rstrip('/')}/" - try: - resp = await self.client.get(url, params={"s": query}) - return self._parse_movie_cards(resp.text, str(resp.url), query=query) - except Exception as e: - return {"error": str(e), "query": query} - - def _parse_movie_cards( - self, html: str, source_url: str, category: str = None, query: str = None - ) -> Dict: - """Helper to parse movie cards for both home and search endpoints.""" - soup = BeautifulSoup(html, "html.parser") - results = [] - - for a_tag in soup.find_all("a", class_="movie-card"): - href = a_tag.get("href") - if not href: - continue - if href.startswith("/"): - href = HDHUB_BASE_URL.rstrip("/") + href - - title_tag = a_tag.find("h3", class_="movie-card-title") - title = title_tag.text.strip() if title_tag else "Unknown Title" - - img_tag = a_tag.find("img") - img_url = img_tag.get("src") if img_tag else "" - - if title: - results.append({"title": title, "url": href, "image": img_url}) - - unique_results = [] - seen = set() - for r in results: - if r["url"] not in seen: - seen.add(r["url"]) - unique_results.append(r) - - response = {"source": source_url} - if category: - response["category"] = category - if query: - response["query"] = query - response["results"] = unique_results - - return response - - async def get_info(self, url: str) -> Dict: - try: - resp = await self.client.get(url) - soup = BeautifulSoup(resp.text, "html.parser") - - # Basic details - title_tag = soup.find("h1", class_="page-title") or soup.find("h1") - title = title_tag.text.strip() if title_tag else "Unknown" - - poster_tag = soup.find("img", src=re.compile(r"tmdb\.org")) - poster = poster_tag.get("src") if poster_tag else "" - - # --------------------------------------------------------- - # HTML SCRAPING: Score, Genres, Year, Cast, Trailer - # --------------------------------------------------------- - score = "" - score_tag = soup.find("span", class_="imdb-score") - if score_tag: - score = score_tag.text.strip() - - genres = [] - junk_tags = [ - "1080p", - "2160p", - "720p", - "dv hdr", - "sdr", - "movies", - "series", - "hindi", - "english", - ] - for a_tag in soup.select(".badge.badge-outline a[href^='/category/']"): - tag_text = a_tag.text.strip() - if tag_text.lower() not in junk_tags: - genres.append(tag_text) - - trailer_url = "" - trailer_btn = soup.find(id="trailer-btn") - if trailer_btn and trailer_btn.get("data-trailer-url"): - trailer_url = trailer_btn.get("data-trailer-url") - - # Extracting Year & Cast from the Metadata List - year = "" - cast = "" - for item in soup.find_all("div", class_="metadata-item"): - label = item.find("span", class_="metadata-label") - value = item.find("span", class_="metadata-value") - if label and value: - lbl = label.text.strip().lower() - val = value.text.strip() - - if "air" in lbl or "release" in lbl or "year" in lbl: - match = re.search(r"\d{4}", val) - if match: - year = match.group(0) - - if "stars" in lbl or "cast" in lbl: - cast = val - - description = "" - for p in soup.find_all("p"): - text_clean = p.text.strip() - if ( - len(text_clean) > 50 - and "download" not in text_clean.lower() - and ( - "band together" in text_clean.lower() - or "young" in text_clean.lower() - or len(text_clean) > 80 - ) - ): - description = text_clean - break - - # --------------------------------------------------------- - # PARSE PACKS & EPISODES - # --------------------------------------------------------- - download_links = {"packs": [], "episodes": []} - seen_pack_links = set() - seen_episode_links = set() - - # PACKS - for pack_item in soup.find_all("div", class_="download-item"): - header = pack_item.find("div", class_="download-header") - if not header: - continue - pack_name = re.sub(r"\s+", " ", header.text.replace("\n", " ").strip()) - - links_div = pack_item.find("div", class_="grid grid-cols-2 gap-2") - if links_div: - pack_links_list = [] - for btn in links_div.find_all("a", class_="btn"): - href, server_name = ( - btn.get("href"), - btn.text.strip().replace("\xa0", "").strip(), - ) - - # --- EXCLUDE HUBDRIVE LINKS --- - if "hubdrive" in server_name.lower(): - continue - - uniq_key = (pack_name, server_name) - if href and uniq_key not in seen_pack_links: - seen_pack_links.add(uniq_key) - pack_links_list.append({"server": server_name, "url": href}) - if pack_links_list: - download_links["packs"].append( - {"title": pack_name, "links": pack_links_list} - ) - - # EPISODES - grouped_episodes = {} - current_season_context = "Unknown Quality" - - elements = soup.find_all( - lambda tag: ( - tag.name == "div" - and tag.get("class") - and ( - "episode-header" in tag.get("class") - or "episode-download-item" in tag.get("class") - ) - ) - ) - - for element in elements: - classes = element.get("class", []) - - if "episode-header" in classes: - current_season_context = re.sub(r"\s+", " ", element.text.strip()) - if current_season_context not in grouped_episodes: - grouped_episodes[current_season_context] = {} - - elif "episode-download-item" in classes: - ep_info_tag = element.find("span", class_="badge-psa") - ep_num_str = re.sub( - r"\s+", - " ", - ep_info_tag.text.strip() if ep_info_tag else "Unknown Episode", - ) - - if current_season_context not in grouped_episodes: - grouped_episodes[current_season_context] = {} - if ep_num_str not in grouped_episodes[current_season_context]: - grouped_episodes[current_season_context][ep_num_str] = [] - - links_div = element.find("div", class_="episode-links") - if links_div: - for btn in links_div.find_all("a", class_="btn"): - href, server_name = ( - btn.get("href"), - btn.text.strip().replace("\xa0", "").strip(), - ) - - # --- EXCLUDE HUBDRIVE LINKS --- - if "hubdrive" in server_name.lower(): - continue - - uniq_key = (current_season_context, ep_num_str, server_name) - if href and uniq_key not in seen_episode_links: - seen_episode_links.add(uniq_key) - grouped_episodes[current_season_context][ - ep_num_str - ].append({"server": server_name, "url": href}) - - # Format array - formatted_episodes = [] - for season_quality, episodes_dict in grouped_episodes.items(): - if not episodes_dict: - continue - ep_list = [] - for ep_name, links in episodes_dict.items(): - # Only add the episode to the final array if there is at least one link (removes empty HubDrive-only episodes if any exist) - if links: - ep_list.append({"episode": ep_name, "links": links}) - - if ep_list: - formatted_episodes.append( - {"season_quality": season_quality, "episodes": ep_list} - ) - - download_links["episodes"] = formatted_episodes - - return { - "title": title, - "year": year, - "score": score, - "genres": genres, - "cast": cast, - "trailer": trailer_url, - "poster": poster, - "description": description, - "download_links": download_links, - "page_url": url, - } - except Exception as e: - return {"error": str(e), "url": url} - - -# ====================== FastAPI ====================== -engine = HDHubEngine() - - -@asynccontextmanager -async def lifespan(app: FastAPI): - yield - await engine.client.aclose() - - -app = FastAPI(lifespan=lifespan, title="HDHub Scraper & Resolver v4.3") - - -@app.get("/") -async def root_directory(): - return JSONResponse( - { - "name": "HDHub API Scraper & Resolver", - "version": "4.3", - "features": "Added Search Endpoint. Native extraction for Score/Year/Genres/Cast. Auto-Pixeldrain resolver. HubDrive filtered.", - "endpoints": { - "/home": { - "description": "Fetch homepage or category-specific movie/series lists.", - "method": "GET", - "parameters": { - "type": "(Optional) movie, anime, ott, latest_movie." - }, - "example": "/home?type=anime", - }, - "/search": { - "description": "Search for movies or series by title.", - "method": "GET", - "parameters": {"query": "(Required) The search keyword."}, - "example": "/search?query=batman", - }, - "/info": { - "description": "Scrape full info (Metadata, Packs, Episodes). HubDrive links are automatically hidden.", - "method": "GET", - "parameters": { - "url": "(Required) The full HDHub post URL obtained from /home or /search." - }, - "example": "/info?url=https://4khdhub.dad/xo-kitty-series-1017/", - }, - "/resolve": { - "description": "Bypass intermediate ad/wrapper links to get final download URL. Automatically converts Pixeldrain links.", - "method": "GET", - "parameters": {"url": "(Required) Wrapper URL from /info."}, - "example": "/resolve?url=https://gadgetsweb.xyz/?id=...", - }, - }, - } - ) - - -@app.get("/home") -async def api_home( - type: str = Query( - "latest_movie", - description="Type of content available: movie, anime, ott, latest_movie", - ), -): - return await engine.get_home(type) - - -@app.get("/search") -async def api_search(query: str = Query(..., description="The search keyword")): - return await engine.search(query) - - -@app.get("/info") -async def api_info( - url: str = Query(..., description="Post URL (e.g. from /home or /search)"), -): - if not url.startswith(("http://", "https://")): - raise HTTPException(400, "Invalid URL") - return await engine.get_info(url) - - -@app.get("/resolve") -async def api_resolve(url: str = Query(..., description="Wrapper URL to resolve")): - if not url.startswith(("http://", "https://")): - raise HTTPException(400, "Invalid URL") - return await engine.resolve(url) - - -if __name__ == "__main__": - import uvicorn - - uvicorn.run(app, host="0.0.0.0", port=7860) +import asyncio +import base64 +import codecs +import json +import re +import logging +import datetime +from typing import Dict, List, Optional +from contextlib import asynccontextmanager + +import httpx +from fastapi import FastAPI, Query, HTTPException +from bs4 import BeautifulSoup +from fastapi.responses import JSONResponse + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("HDHub-Resolver") + +# ========================================== +# CONFIGURATION +# ========================================== +HDHUB_BASE_URL = "https://4khdhub.dad" + + +def safe_b64decode(data: str) -> str: + if not data: + return "" + try: + data = data.strip() + missing = len(data) % 4 + if missing: + data += "=" * (4 - missing) + return base64.b64decode(data, validate=False).decode("utf-8", errors="ignore") + except Exception: + return "" + + +class HDHubEngine: + def __init__(self): + self.client = httpx.AsyncClient( + timeout=httpx.Timeout(30.0, connect=15.0), + follow_redirects=True, + headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + }, + ) + + # ========================================== + # UTILS + # ========================================== + def _convert_pixeldrain(self, url: str) -> str: + """Converts normal Pixeldrain viewer links to direct download API links.""" + match = re.search( + r"(https?://pixeldrain\.[a-z]+)/u/([a-zA-Z0-9_-]+)", url, re.IGNORECASE + ) + if match: + base_url = match.group(1) + file_id = match.group(2) + return f"{base_url}/api/file/{file_id}?download" + return url + + # ========================================== + # RESOLVER MODULE + # ========================================== + async def resolve(self, url: str, depth: int = 0) -> Dict: + if depth > 12: + return {"error": "Max recursion depth reached", "url": url} + + logger.info(f"Resolving (depth {depth}): {url}") + + if "pixeldrain" in url.lower(): + return { + "found_links": [ + { + "server": "Pixeldrain Direct", + "link": self._convert_pixeldrain(url), + } + ], + "count": 1, + "type": "direct", + } + + try: + resp = await self.client.get(url) + text = resp.text + final_url = str(resp.url) + + if any( + ext in final_url.lower() + for ext in [".mkv", ".mp4", ".m3u8", ".zip", "pixeldrain"] + ): + return { + "found_links": [ + { + "server": "Direct Link", + "link": self._convert_pixeldrain(final_url), + } + ], + "count": 1, + "type": "direct", + } + + if any( + k in final_url.lower() + for k in ["gadgetsweb", "cryptoinsights", "techly360", "?id="] + ): + return await self._handle_wrapper(final_url, text, depth) + + if any( + k in final_url.lower() + for k in ["hubcloud", "gamerxyt", "shikshakdaak", "drive/"] + ): + return await self._handle_hubcloud_style_page(final_url, text, depth) + + return await self._fallback_search(text, depth) + + except httpx.HTTPStatusError as e: + return {"error": f"HTTP {e.response.status_code}", "url": url} + except Exception as e: + logger.exception("Resolution error") + return {"error": str(e), "url": url} + + async def _handle_hubcloud_style_page( + self, page_url: str, text: str, depth: int + ) -> Dict: + soup = BeautifulSoup(text, "html.parser") + links_data = [] + seen = set() + + current_minute = str(datetime.datetime.now().minute).zfill(2) + base_to_ignore = set() + + s3 = soup.find("a", id="s3") + if s3 and s3.get("href"): + base_s3 = s3["href"].strip() + base_to_ignore.add(base_s3) + dynamic_s3 = base_s3 + "_1" + current_minute + seen.add(dynamic_s3) + links_data.append( + { + "server": s3.text.strip().replace("Download", "").strip() + or "FSLv2 Server", + "link": dynamic_s3, + } + ) + + fsl = soup.find("a", id="fsl") + if fsl and fsl.get("href"): + base_fsl = fsl["href"].strip() + base_to_ignore.add(base_fsl) + dynamic_fsl = base_fsl + "1" + current_minute + seen.add(dynamic_fsl) + links_data.append( + { + "server": fsl.text.strip().replace("Download", "").strip() + or "FSL Server", + "link": dynamic_fsl, + } + ) + + junk_keywords = [ + "hubcloud.fans", + "drive/admin", + "t.me", + "tinyurl.com", + "one.one.one.one", + "google.com/search", + "ampproject.org", + "bloggingvector.shop", + ] + + for a in soup.find_all("a", href=True): + href = a["href"].strip() + if not href or href.startswith(("#", "javascript:")): + continue + if href.startswith("/"): + href = f"https://{httpx.URL(page_url).host}{href}" + if href in base_to_ignore or any(j in href.lower() for j in junk_keywords): + continue + + if "gamerxyt.com/hubcloud.php" in href.lower() and "host=" in href.lower(): + return await self.resolve(href, depth + 1) + + if href not in seen: + seen.add(href) + label = a.text.strip().replace("Download", "").strip() + links_data.append({"server": label or "Unknown Server", "link": href}) + + download_links = [] + for item in links_data: + lower_link = item["link"].lower() + if any( + x in lower_link + for x in [ + ".mkv", + ".mp4", + ".zip", + "pixeldrain", + "fsl-buckets", + "toxix.buzz", + "hubcdn.fans", + "cloudserver", + ] + ): + item["link"] = self._convert_pixeldrain(item["link"]) + download_links.append(item) + + if download_links: + return { + "found_links": download_links, + "count": len(download_links), + "source": "hubcloud_gamerxyt", + "page_url": page_url, + } + + return {"detail": "No usable download links found", "page_url": page_url} + + async def _handle_wrapper(self, url: str, text: str, depth: int) -> Dict: + direct_match = re.search( + r'(https?://(?:www\.)?gamerxyt\.com/hubcloud\.php\?[^"\'>\s]+)', text + ) + if direct_match: + return await self.resolve(direct_match.group(1), depth + 1) + + for b64 in re.findall(r"[A-Za-z0-9+/=]{40,}", text): + dec = safe_b64decode(b64) + if "gamerxyt.com" in dec or "hubcloud.php" in dec: + m = re.search(r'(https?://[^"\'>\s]+)', dec) + if m: + return await self.resolve(m.group(1), depth + 1) + + patterns = [ + r"s\('o','([A-Za-z0-9+/=]+)'", + r"ck\('_wp_http[^']*','([^']+)'", + r"['\"]([A-Za-z0-9+/=]{100,})['\"]", + ] + combined = "".join(["".join(re.findall(pat, text)) for pat in patterns]) + + if combined: + step1 = safe_b64decode(combined) + step2 = safe_b64decode(step1) + rotated = codecs.encode(step2, "rot_13") if step2 else step2 + final_str = safe_b64decode(rotated) + try: + if final_str.strip().startswith("{"): + data = json.loads(final_str) + next_url = data.get("o") or data.get("url") or data.get("link") + if next_url: + decoded = safe_b64decode(next_url) + final = decoded if decoded.startswith("http") else next_url + return await self.resolve(final, depth + 1) + except Exception: + pass + + return {"error": "Wrapper failed to extract link", "url": url} + + async def _fallback_search(self, text: str, depth: int) -> Dict: + for b64 in re.findall(r"[A-Za-z0-9+/=]{100,}", text): + dec = safe_b64decode(b64) + if dec.startswith(("http://", "https://")): + return await self.resolve(dec, depth + 1) + return {"error": "Unsupported page", "detail": "No recognizable pattern"} + + # ========================================== + # SCRAPING MODULE (Home, Info, Search) + # ========================================== + async def get_home(self, cat_type: str = "latest_movie") -> Dict: + cat_type = cat_type.lower().strip() + if cat_type in ["movies", "movie"]: + cat_type = "movie" + elif cat_type in ["lates_movie", "latest_movies", "latest"]: + cat_type = "latest_movie" + + paths = { + "movie": "/category/movies/", + "anime": "/category/anime/", + "ott": "/category/web-series/", + "latest_movie": "/", + } + target_path = paths.get(cat_type, "/") + url = f"{HDHUB_BASE_URL.rstrip('/')}{target_path}" + + try: + resp = await self.client.get(url) + if resp.status_code == 404 and cat_type != "latest_movie": + fallback_paths = { + "movie": "/movies/", + "anime": "/anime/", + "ott": "/web-series/", + } + url = f"{HDHUB_BASE_URL.rstrip('/')}{fallback_paths.get(cat_type, target_path)}" + resp = await self.client.get(url) + + return self._parse_movie_cards(resp.text, url, category=cat_type) + except Exception as e: + return {"error": str(e)} + + async def search(self, query: str) -> Dict: + url = f"{HDHUB_BASE_URL.rstrip('/')}/" + try: + resp = await self.client.get(url, params={"s": query}) + return self._parse_movie_cards(resp.text, str(resp.url), query=query) + except Exception as e: + return {"error": str(e), "query": query} + + def _parse_movie_cards( + self, html: str, source_url: str, category: str = None, query: str = None + ) -> Dict: + """Helper to parse movie cards for both home and search endpoints.""" + soup = BeautifulSoup(html, "html.parser") + results = [] + + for a_tag in soup.find_all("a", class_="movie-card"): + href = a_tag.get("href") + if not href: + continue + if href.startswith("/"): + href = HDHUB_BASE_URL.rstrip("/") + href + + title_tag = a_tag.find("h3", class_="movie-card-title") + title = title_tag.text.strip() if title_tag else "Unknown Title" + + img_tag = a_tag.find("img") + img_url = img_tag.get("src") if img_tag else "" + + if title: + results.append({"title": title, "url": href, "image": img_url}) + + unique_results = [] + seen = set() + for r in results: + if r["url"] not in seen: + seen.add(r["url"]) + unique_results.append(r) + + response = {"source": source_url} + if category: + response["category"] = category + if query: + response["query"] = query + response["results"] = unique_results + + return response + + async def get_info(self, url: str) -> Dict: + try: + resp = await self.client.get(url) + soup = BeautifulSoup(resp.text, "html.parser") + + # Basic details + title_tag = soup.find("h1", class_="page-title") or soup.find("h1") + title = title_tag.text.strip() if title_tag else "Unknown" + + poster_tag = soup.find("img", src=re.compile(r"tmdb\.org")) + poster = poster_tag.get("src") if poster_tag else "" + + # --------------------------------------------------------- + # HTML SCRAPING: Score, Genres, Year, Cast, Trailer + # --------------------------------------------------------- + score = "" + score_tag = soup.find("span", class_="imdb-score") + if score_tag: + score = score_tag.text.strip() + + genres = [] + junk_tags = [ + "1080p", + "2160p", + "720p", + "dv hdr", + "sdr", + "movies", + "series", + "hindi", + "english", + ] + for a_tag in soup.select(".badge.badge-outline a[href^='/category/']"): + tag_text = a_tag.text.strip() + if tag_text.lower() not in junk_tags: + genres.append(tag_text) + + # --- DETERMINE IF MOVIE OR SERIES --- + content_type = "movie" + url_lower = url.lower() + if "-series-" in url_lower: + content_type = "series" + elif any(g.lower() in ["series", "web series", "tv show"] for g in genres): + content_type = "series" + elif soup.find("div", class_="episode-download-item") or soup.find( + id="episodes" + ): + content_type = "series" + + trailer_url = "" + trailer_btn = soup.find(id="trailer-btn") + if trailer_btn and trailer_btn.get("data-trailer-url"): + trailer_url = trailer_btn.get("data-trailer-url") + + # Extracting Year & Cast from the Metadata List + year = "" + cast = "" + for item in soup.find_all("div", class_="metadata-item"): + label = item.find("span", class_="metadata-label") + value = item.find("span", class_="metadata-value") + if label and value: + lbl = label.text.strip().lower() + val = value.text.strip() + + if "air" in lbl or "release" in lbl or "year" in lbl: + match = re.search(r"\d{4}", val) + if match: + year = match.group(0) + + if "stars" in lbl or "cast" in lbl: + cast = val + + description = "" + for p in soup.find_all("p"): + text_clean = p.text.strip() + if ( + len(text_clean) > 50 + and "download" not in text_clean.lower() + and ( + "band together" in text_clean.lower() + or "young" in text_clean.lower() + or len(text_clean) > 80 + ) + ): + description = text_clean + break + + # --------------------------------------------------------- + # PARSE LINKS BASED ON TYPE (Movie vs Series) + # --------------------------------------------------------- + download_links = {} + + if content_type == "series": + # EPISODES PARSING (Ignores "Packs" completely) + grouped_episodes = {} + current_season_context = "Unknown Quality" + seen_episode_links = set() + + elements = soup.find_all( + lambda tag: ( + tag.name == "div" + and tag.get("class") + and ( + "episode-header" in tag.get("class") + or "episode-download-item" in tag.get("class") + ) + ) + ) + + for element in elements: + classes = element.get("class", []) + + if "episode-header" in classes: + current_season_context = re.sub( + r"\s+", " ", element.text.strip() + ) + if current_season_context not in grouped_episodes: + grouped_episodes[current_season_context] = {} + + elif "episode-download-item" in classes: + ep_info_tag = element.find("span", class_="badge-psa") + ep_num_str = re.sub( + r"\s+", + " ", + ep_info_tag.text.strip() + if ep_info_tag + else "Unknown Episode", + ) + + if current_season_context not in grouped_episodes: + grouped_episodes[current_season_context] = {} + if ep_num_str not in grouped_episodes[current_season_context]: + grouped_episodes[current_season_context][ep_num_str] = [] + + links_div = element.find("div", class_="episode-links") + if links_div: + for btn in links_div.find_all("a", class_="btn"): + href, server_name = ( + btn.get("href"), + btn.text.strip().replace("\xa0", "").strip(), + ) + + # EXCLUDE HUBDRIVE LINKS + if "hubdrive" in server_name.lower(): + continue + + uniq_key = ( + current_season_context, + ep_num_str, + server_name, + ) + if href and uniq_key not in seen_episode_links: + seen_episode_links.add(uniq_key) + grouped_episodes[current_season_context][ + ep_num_str + ].append({"server": server_name, "url": href}) + + # Format array + formatted_episodes = [] + for season_quality, episodes_dict in grouped_episodes.items(): + if not episodes_dict: + continue + ep_list = [] + for ep_name, links in episodes_dict.items(): + if links: # Remove empty episodes if HubDrive was the only link + ep_list.append({"episode": ep_name, "links": links}) + + if ep_list: + formatted_episodes.append( + {"season_quality": season_quality, "episodes": ep_list} + ) + + download_links["episodes"] = formatted_episodes + + else: + # MOVIE PARSING (Grabs direct files/qualities) + movie_links = [] + seen_movie_links = set() + + for pack_item in soup.find_all("div", class_="download-item"): + header = pack_item.find("div", class_="download-header") + if not header: + continue + quality_name = re.sub( + r"\s+", " ", header.text.replace("\n", " ").strip() + ) + + links_div = pack_item.find("div", class_="grid grid-cols-2 gap-2") + if links_div: + btn_links = [] + for btn in links_div.find_all("a", class_="btn"): + href, server_name = ( + btn.get("href"), + btn.text.strip().replace("\xa0", "").strip(), + ) + + # EXCLUDE HUBDRIVE LINKS + if "hubdrive" in server_name.lower(): + continue + + uniq_key = (quality_name, server_name) + if href and uniq_key not in seen_movie_links: + seen_movie_links.add(uniq_key) + btn_links.append({"server": server_name, "url": href}) + + if btn_links: + movie_links.append( + {"quality": quality_name, "links": btn_links} + ) + + download_links["movie_links"] = movie_links + + return { + "title": title, + "type": content_type, + "year": year, + "score": score, + "genres": genres, + "cast": cast, + "trailer": trailer_url, + "poster": poster, + "description": description, + "download_links": download_links, + "page_url": url, + } + except Exception as e: + return {"error": str(e), "url": url} + + +# ====================== FastAPI ====================== +engine = HDHubEngine() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + yield + await engine.client.aclose() + + +app = FastAPI(lifespan=lifespan, title="HDHub Scraper & Resolver v4.4") + + +@app.get("/") +async def root_directory(): + return JSONResponse( + { + "name": "HDHub API Scraper & Resolver", + "version": "4.4", + "features": "Type Detection (Movie/Series). Removed Series Packs. Added Search. HubDrive Filtered. Pixeldrain Auto-Converter.", + "endpoints": { + "/home": { + "description": "Fetch homepage or category-specific movie/series lists.", + "method": "GET", + "parameters": { + "type": "(Optional) movie, anime, ott, latest_movie." + }, + "example": "/home?type=anime", + }, + "/search": { + "description": "Search for movies or series by title.", + "method": "GET", + "parameters": {"query": "(Required) The search keyword."}, + "example": "/search?query=batman", + }, + "/info": { + "description": "Scrape full info. Automatically detects Movie or Series and hides series 'Packs'. HubDrive links are hidden.", + "method": "GET", + "parameters": { + "url": "(Required) The full HDHub post URL obtained from /home or /search." + }, + "example": "/info?url=https://4khdhub.dad/xo-kitty-series-1017/", + }, + "/resolve": { + "description": "Bypass intermediate ad/wrapper links to get final download URL. Automatically converts Pixeldrain links.", + "method": "GET", + "parameters": {"url": "(Required) Wrapper URL from /info."}, + "example": "/resolve?url=https://gadgetsweb.xyz/?id=...", + }, + }, + } + ) + + +@app.get("/home") +async def api_home( + type: str = Query( + "latest_movie", + description="Type of content available: movie, anime, ott, latest_movie", + ), +): + return await engine.get_home(type) + + +@app.get("/search") +async def api_search(query: str = Query(..., description="The search keyword")): + return await engine.search(query) + + +@app.get("/info") +async def api_info( + url: str = Query(..., description="Post URL (e.g. from /home or /search)"), +): + if not url.startswith(("http://", "https://")): + raise HTTPException(400, "Invalid URL") + return await engine.get_info(url) + + +@app.get("/resolve") +async def api_resolve(url: str = Query(..., description="Wrapper URL to resolve")): + if not url.startswith(("http://", "https://")): + raise HTTPException(400, "Invalid URL") + return await engine.resolve(url) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=7860)