# -*- coding: utf-8 -*- from __future__ import absolute_import import io import socket import asyncio import websockets import json import logging import os import random import rarfile import re import zipfile from subzero.language import Language from guessit import guessit from requests import Session from subliminal.providers import ParserBeautifulSoup, Provider from subliminal import __short_version__ from subliminal.cache import SHOW_EXPIRATION_TIME, region from subliminal.score import get_equivalent_release_groups from subliminal.subtitle import SUBTITLE_EXTENSIONS, Subtitle, fix_line_ending from subliminal.utils import sanitize, sanitize_release_group from subliminal.video import Movie from subliminal_patch.subtitle import guess_matches logger = logging.getLogger(__name__) browserless_ws_endpoint = 'ws://localhost:4000' year_re = re.compile(r'^\((\d{4})\)$') class Subs4FreeSubtitle(Subtitle): """Subs4Free Subtitle.""" provider_name = 'subs4free' def __init__(self, language, page_link, title, year, version, download_link, uploader): super(Subs4FreeSubtitle, self).__init__(language, page_link=page_link) self.title = title self.year = year self.version = version self.release_info = version self.download_link = download_link self.uploader = uploader self.hearing_impaired = None self.encoding = 'utf8' @property def id(self): return self.download_link def get_matches(self, video): matches = set() # movie if isinstance(video, Movie): # title if video.title and (sanitize(self.title) in ( sanitize(name) for name in [video.title] + video.alternative_titles)): matches.add('title') # year if video.year and self.year == video.year: matches.add('year') # release_group if (video.release_group and self.version and any(r in sanitize_release_group(self.version) for r in get_equivalent_release_groups(sanitize_release_group(video.release_group)))): matches.add('release_group') # other properties matches |= guess_matches(video, guessit(self.version, {'type': 'movie'}), partial=True) return matches class Subs4FreeProvider(Provider): """Subs4Free Provider.""" languages = {Language(l) for l in ['ell', 'eng']} video_types = (Movie,) server_url = 'https://www.subs4free.info' download_url = '/getSub.php' search_url = '/search_report.php?search={}&searchType=1' anti_block_1 = 'https://images.subs4free.info/favicon.ico' anti_block_2 = 'https://www.subs4series.com/includes/anti-block-layover.php?launch=1' anti_block_3 = 'https://www.subs4series.com/includes/anti-block.php' subtitle_class = Subs4FreeSubtitle def __init__(self): self.session = None def initialize(self): self.session = Session() from .utils import FIRST_THOUSAND_OR_SO_USER_AGENTS as AGENT_LIST self.session.headers['User-Agent'] = AGENT_LIST[random.randint(0, len(AGENT_LIST) - 1)] def terminate(self): self.session.close() def get_show_links(self, title, year=None): """Get the matching show links for `title` and `year`. First search in the result of :meth:`_get_show_suggestions`. :param title: show title. :param year: year of the show, if any. :type year: int :return: the show links, if found. :rtype: list of str """ title = sanitize(title) suggestions = self._get_suggestions(title) show_links = [] for suggestion in suggestions: show_title = sanitize(suggestion['title']) if show_title == title or (year and show_title == '{title} {year:d}'.format(title=title, year=year)): logger.debug('Getting show id') show_links.append(suggestion['link'].split('?p=')[-1]) return show_links @region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME, should_cache_fn=lambda value: value) def _get_suggestions(self, title): """Search the show or movie id from the `title` and `year`. :param str title: title of the show. :return: the show suggestions found. :rtype: list of dict """ # make the search logger.info('Searching show ids with %r', title) search_url = self.server_url + self.search_url.format(title) logger.debug('Requesting URL: %s', search_url) try: r = self.session.get(search_url, headers={'Referer': self.server_url}, timeout=10) except socket.gaierror as e: logger.error(f"DNS resolution failed for {search_url}: {e}") return [] r.raise_for_status() if not r.content: logger.debug('No data returned from provider') return [] soup = ParserBeautifulSoup(r.content, ['html.parser']) suggestions = [{'link': l.attrs['value'], 'title': l.text} for l in soup.select('select[name="Mov_sel"] > option[value]')] logger.debug('Found suggestions: %r', suggestions) return suggestions def query(self, movie_id, title, year): # get the season list of the show logger.info('Getting the subtitle list of show id %s', movie_id) if movie_id: page_link = self.server_url + '/' + movie_id else: page_link = self.server_url + self.search_url.format(' '.join([title, str(year)])) logger.debug('Requesting URL: %s', page_link) try: r = self.session.get(page_link, timeout=10) except socket.gaierror as e: logger.error(f"DNS resolution failed for {page_link}: {e}") return [] r.raise_for_status() if not r.content: logger.debug('No data returned from provider') return [] soup = ParserBeautifulSoup(r.content, ['html.parser']) year = None year_element = soup.select_one('td#dates_header > table div') matches = False if year_element: matches = year_re.match(str(year_element.contents[2]).strip()) if matches: year = int(matches.group(1)) title_tag = soup.select_one('td#dates_header > table u') show_title = str(title_tag.contents[0]).strip() if title_tag else None subtitles = [] # loop over episode rows for subs_tag in soup.select('.movie-details'): # read common info version = subs_tag.find('span').text download_link = self.server_url + subs_tag.find('a')['href'] uploader = subs_tag.select_one('.movie-info').find('p').find('a').text language_code = subs_tag.select_one('.sprite')['class'][1].split('gif')[0] language = Language.fromietf(language_code) subtitle = self.subtitle_class(language, page_link, show_title, year, version, download_link, uploader) logger.debug('Found subtitle {!r}'.format(subtitle)) subtitles.append(subtitle) return subtitles def list_subtitles(self, video, languages): # lookup show_id titles = [video.title] + video.alternative_titles if isinstance(video, Movie) else [] show_links = None for title in titles: show_links = self.get_show_links(title, video.year) if show_links: break subtitles = [] # query for subtitles with the show_id if show_links: for show_link in show_links: subtitles += [s for s in self.query(show_link, video.title, video.year) if s.language in languages] else: subtitles += [s for s in self.query(None, sanitize(video.title), video.year) if s.language in languages] return subtitles def download_subtitle(self, subtitle): if isinstance(subtitle, Subs4FreeSubtitle): logger.info('Downloading subtitle %r', subtitle) async def fetch_subtitle(): retries = 3 for attempt in range(retries): try: async with websockets.connect(browserless_ws_endpoint, extra_headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}) as websocket: # Establish session # Establish session session_payload = { 'id': 1, 'method': 'Target.createBrowserContext' } logger.debug(f"Sending session payload: {json.dumps(session_payload)}") await websocket.send(json.dumps(session_payload)) session_response = await websocket.recv() logger.debug(f"Received session response: {session_response}") session_data = json.loads(session_response) if 'error' in session_data: logger.error(f"Error in session response: {session_data['error']}") return session_id = session_data.get('result', {}).get('browserContextId') if not session_id: logger.debug('Unable to establish session. No session ID returned') return # Send navigation payload navigation_payload = { 'id': 2, 'method': 'Page.navigate', 'sessionId': session_id, 'params': { 'url': subtitle.download_link, 'headers': {'Referer': subtitle.page_link} } } logger.debug(f"Sending navigation payload: {json.dumps(navigation_payload)}") await websocket.send(json.dumps(navigation_payload)) navigation_response = await websocket.recv() logger.debug(f"Received navigation response: {navigation_response}") navigation_data = json.loads(navigation_response) if 'error' in navigation_data: logger.error(f"Error in navigation response: {navigation_data['error']}") return if not navigation_data.get('result'): logger.debug('Unable to download subtitle. No data returned from provider') return archive = _get_archive(navigation_data['result'].encode('utf-8')) subtitle_content = _get_subtitle_from_archive(archive) if archive else navigation_data['result'] if subtitle_content: subtitle.content = fix_line_ending(subtitle_content) else: logger.debug('Could not extract subtitle from %r', archive) break except (socket.gaierror, websockets.exceptions.InvalidURI, websockets.exceptions.ConnectionClosedError) as e: logger.error(f"Attempt {attempt + 1} failed: {e}") if attempt < retries - 1: await asyncio.sleep(5) # wait before retrying else: logger.error(f"All {retries} attempts failed. Giving up.") return asyncio.run(fetch_subtitle()) def apply_anti_block(self, subtitle): try: logger.debug('Requesting anti-block URL: %s', self.anti_block_1) self.session.get(self.anti_block_1, headers={'Referer': subtitle.download_link}, timeout=10) logger.debug('Requesting anti-block URL: %s', self.anti_block_2) self.session.get(self.anti_block_2, headers={'Referer': subtitle.download_link}, timeout=10) logger.debug('Requesting anti-block URL: %s', self.anti_block_3) self.session.get(self.anti_block_3, headers={'Referer': subtitle.download_link}, timeout=10) except socket.gaierror as e: logger.error(f"DNS resolution failed for anti-block URLs: {e}") def _get_archive(content): # open the archive archive_stream = io.BytesIO(content) archive = None if rarfile.is_rarfile(archive_stream): logger.debug('Identified rar archive') archive = rarfile.RarFile(archive_stream) elif zipfile.is_zipfile(archive_stream): logger.debug('Identified zip archive') archive = zipfile.ZipFile(archive_stream) return archive def _get_subtitle_from_archive(archive): for name in archive.namelist(): # discard hidden files if os.path.split(name)[-1].startswith('.'): continue # discard non-subtitle files if not name.lower().endswith(SUBTITLE_EXTENSIONS): continue return archive.read(name) return None