2025-03-02 19:12:50 +01:00

182 lines
6.2 KiB
Python

import os
import time
import urllib.request
from abc import ABC, abstractmethod
from urllib.parse import urlencode
from urllib.request import Request
from selenium import webdriver
from selenium.common.exceptions import (
NoSuchElementException,
SessionNotCreatedException,
StaleElementReferenceException,
TimeoutException,
WebDriverException,
)
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
class Downloader(ABC):
def __init__(self):
self.support_2nd_extract = False
@abstractmethod
def download(self, url, post=None):
pass
def get_content(
self,
url,
cache=None,
referer=None,
post=None,
content_type=None,
data=None,
):
if cache and os.path.exists(cache):
print("Loading cache ({})".format(cache))
with open(cache) as f:
content = "\n".join(f.readlines())
else:
content = self.download(
url,
referer=referer,
post=post,
content_type=content_type,
data=data,
)
if cache:
print("Saving cache ({})".format(cache))
dir = os.path.dirname(cache)
if dir != "" and not os.path.exists(dir):
os.makedirs(dir)
with open(cache, "w") as text_file:
text_file.write(content)
return content
class SimpleDownloader(Downloader):
def __init__(self):
super().__init__()
def download(self, url, referer=None, post=None, content_type=None, data=None):
print("Downloading {} referer: {} post: {}".format(url, referer, post))
try:
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:126.0) Gecko/20100101 Firefox/126.0",
}
if referer is not None:
headers["Referer"] = referer
if content_type is not None:
headers["Content-Type"] = content_type
req = Request(url, headers=headers, data=data)
if post:
post_args = urlencode(post).encode("utf-8")
resource = urllib.request.urlopen(req, post_args)
else:
resource = urllib.request.urlopen(req)
charset = resource.headers.get_content_charset()
if charset:
data = resource.read().decode(charset)
else:
data = resource.read().decode()
return data
except Exception as e:
print(e)
raise Exception("Error during download: " + str(e)[:64] + "...")
class ChromiumHeadlessDownloader(Downloader):
def __init__(self, pause=True, noimage=True, proxy=False):
super().__init__()
self.support_2nd_extract = True
self.pause = pause
self.proxy = proxy
self.options = Options()
self.options.add_argument("--headless=new")
self.options.add_argument("--disable-dev-shm-usage")
self.options.add_argument("--no-sandbox")
self.options.add_argument("start-maximized")
self.options.add_argument("enable-automation")
self.options.add_argument("--disable-dev-shm-usage")
self.options.add_argument("--disable-browser-side-navigation")
self.options.add_argument("--disable-gpu")
if self.proxy:
self.options.add_argument("--proxy-server=socks5://127.0.0.1:12345")
if noimage:
self.options.add_experimental_option(
"prefs",
{
# block image loading
"profile.managed_default_content_settings.images": 2,
},
)
self.service = Service("/usr/bin/chromedriver")
self.driver = webdriver.Chrome(service=self.service, options=self.options)
def __del__(self):
self.driver.quit()
def screenshot(self, url, path_image):
print("Screenshot {}".format(url))
try:
self.driver.get(url)
if self.pause:
time.sleep(2)
self.driver.save_screenshot(path_image)
except Exception:
print(f">> Exception: {url}")
return False
return True
def download(self, url, referer=None, post=None, content_type=None, data=None):
if post:
raise Exception("POST method with Chromium headless not yet implemented")
if referer:
raise Exception(
"Referer parameter with Chromium headless not yet implemented"
)
if data:
raise Exception("Data content with Chromium headless not yet implemented")
if content_type:
raise Exception(
"Content-type parameter with Chromium headless not yet implemented"
)
print("Download {}".format(url))
try:
self.driver.get(url)
if self.pause:
time.sleep(2)
doc = self.driver.page_source
except StaleElementReferenceException as e:
print(f">> {type(e).__name__}: {e.args}")
raise Exception("Error during download: " + str(e)[:64] + "...")
except NoSuchElementException as e:
print(f">> {type(e).__name__}: {e.args}")
raise Exception("Error during download: " + str(e)[:64] + "...")
except TimeoutException as e:
print(f">> {type(e).__name__}: {e.args}")
raise Exception("Error during download: " + str(e)[:64] + "...")
except WebDriverException as e:
print(f">> {type(e).__name__}: {e.args}")
raise Exception("Error during download: " + str(e)[:64] + "...")
except SessionNotCreatedException as e:
print(f">> {type(e).__name__}: {e.args}")
raise Exception("Error during download: " + str(e)[:64] + "...")
except Exception as e:
print(
f">> {type(e).__name__} line {e.__traceback__.tb_lineno} of {__file__}: {e.args}"
)
raise Exception("Error during download: " + str(e)[:64] + "...")
return doc