185 lines
6.2 KiB
Python
185 lines
6.2 KiB
Python
import os
|
|
import time
|
|
import urllib.request
|
|
from abc import ABC, abstractmethod
|
|
from urllib.parse import urlencode
|
|
from urllib.request import Request
|
|
|
|
from selenium import webdriver
|
|
from selenium.common.exceptions import (
|
|
NoSuchElementException,
|
|
SessionNotCreatedException,
|
|
StaleElementReferenceException,
|
|
TimeoutException,
|
|
WebDriverException,
|
|
)
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.chrome.service import Service
|
|
|
|
|
|
class Downloader(ABC):
|
|
def __init__(self):
|
|
self.support_2nd_extract = False
|
|
|
|
@abstractmethod
|
|
def download(self, url, post=None):
|
|
pass
|
|
|
|
def get_content(
|
|
self,
|
|
url,
|
|
cache=None,
|
|
referer=None,
|
|
post=None,
|
|
content_type=None,
|
|
data=None,
|
|
):
|
|
if cache and os.path.exists(cache):
|
|
print("Loading cache ({})".format(cache))
|
|
with open(cache) as f:
|
|
content = "\n".join(f.readlines())
|
|
else:
|
|
content = self.download(
|
|
url,
|
|
referer=referer,
|
|
post=post,
|
|
content_type=content_type,
|
|
data=data,
|
|
)
|
|
|
|
if cache:
|
|
print("Saving cache ({})".format(cache))
|
|
dir = os.path.dirname(cache)
|
|
if dir != "" and not os.path.exists(dir):
|
|
os.makedirs(dir)
|
|
with open(cache, "w") as text_file:
|
|
text_file.write(content)
|
|
return content
|
|
|
|
|
|
class SimpleDownloader(Downloader):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def download(self, url, referer=None, post=None, content_type=None, data=None):
|
|
print("Downloading {} referer: {} post: {}".format(url, referer, post))
|
|
try:
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:126.0) Gecko/20100101 Firefox/126.0",
|
|
}
|
|
if referer is not None:
|
|
headers["Referer"] = referer
|
|
if content_type is not None:
|
|
headers["Content-Type"] = content_type
|
|
req = Request(url, headers=headers, data=data)
|
|
if post:
|
|
post_args = urlencode(post).encode("utf-8")
|
|
resource = urllib.request.urlopen(req, post_args)
|
|
else:
|
|
resource = urllib.request.urlopen(req)
|
|
charset = resource.headers.get_content_charset()
|
|
if charset:
|
|
data = resource.read().decode(charset)
|
|
else:
|
|
data = resource.read().decode()
|
|
return data
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
raise Exception("Error during download: " + str(e)[:64] + "...")
|
|
|
|
|
|
class ChromiumHeadlessDownloader(Downloader):
|
|
def __init__(self, pause=True, noimage=True, proxy=False):
|
|
super().__init__()
|
|
self.support_2nd_extract = True
|
|
|
|
self.pause = pause
|
|
self.proxy = proxy
|
|
self.options = Options()
|
|
self.options.add_argument("--headless=new")
|
|
self.options.add_argument("--disable-dev-shm-usage")
|
|
self.options.add_argument("--no-sandbox")
|
|
self.options.add_argument("start-maximized")
|
|
self.options.add_argument("enable-automation")
|
|
self.options.add_argument("--disable-dev-shm-usage")
|
|
self.options.add_argument("--disable-browser-side-navigation")
|
|
self.options.add_argument("--disable-gpu")
|
|
if self.proxy:
|
|
self.options.add_argument("--proxy-server=socks5://127.0.0.1:12345")
|
|
|
|
if noimage:
|
|
self.options.add_experimental_option(
|
|
"prefs",
|
|
{
|
|
# block image loading
|
|
"profile.managed_default_content_settings.images": 2,
|
|
},
|
|
)
|
|
|
|
self.service = Service("/usr/bin/chromedriver")
|
|
self.driver = webdriver.Chrome(service=self.service, options=self.options)
|
|
|
|
def __del__(self):
|
|
try:
|
|
self.driver.quit()
|
|
except Exception as e:
|
|
print("Error: " + str(e))
|
|
|
|
def screenshot(self, url, path_image):
|
|
print("Screenshot {}".format(url))
|
|
try:
|
|
self.driver.get(url)
|
|
if self.pause:
|
|
time.sleep(2)
|
|
self.driver.save_screenshot(path_image)
|
|
except Exception:
|
|
print(f">> Exception: {url}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def download(self, url, referer=None, post=None, content_type=None, data=None):
|
|
if post:
|
|
raise Exception("POST method with Chromium headless not yet implemented")
|
|
if referer:
|
|
raise Exception(
|
|
"Referer parameter with Chromium headless not yet implemented"
|
|
)
|
|
if data:
|
|
raise Exception("Data content with Chromium headless not yet implemented")
|
|
if content_type:
|
|
raise Exception(
|
|
"Content-type parameter with Chromium headless not yet implemented"
|
|
)
|
|
print("Download {}".format(url))
|
|
|
|
try:
|
|
self.driver.get(url)
|
|
if self.pause:
|
|
time.sleep(2)
|
|
doc = self.driver.page_source
|
|
|
|
except StaleElementReferenceException as e:
|
|
print(f">> {type(e).__name__}: {e.args}")
|
|
raise Exception("Error during download: " + str(e)[:64] + "...")
|
|
except NoSuchElementException as e:
|
|
print(f">> {type(e).__name__}: {e.args}")
|
|
raise Exception("Error during download: " + str(e)[:64] + "...")
|
|
except TimeoutException as e:
|
|
print(f">> {type(e).__name__}: {e.args}")
|
|
raise Exception("Error during download: " + str(e)[:64] + "...")
|
|
except WebDriverException as e:
|
|
print(f">> {type(e).__name__}: {e.args}")
|
|
raise Exception("Error during download: " + str(e)[:64] + "...")
|
|
except SessionNotCreatedException as e:
|
|
print(f">> {type(e).__name__}: {e.args}")
|
|
raise Exception("Error during download: " + str(e)[:64] + "...")
|
|
except Exception as e:
|
|
print(
|
|
f">> {type(e).__name__} line {e.__traceback__.tb_lineno} of {__file__}: {e.args}"
|
|
)
|
|
raise Exception("Error during download: " + str(e)[:64] + "...")
|
|
|
|
return doc
|