Skip to content

Commit

Permalink
Merge branch 'main' into check-already-logged-in
Browse files Browse the repository at this point in the history
  • Loading branch information
sebthom authored Sep 17, 2023
2 parents 85ca945 + 79dc665 commit 8f9510f
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 120 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ __pycache__
/dist
/.eggs
/*.egg-info
/.mypy_cache
/.pdm-python

# Eclipse
Expand Down
21 changes: 13 additions & 8 deletions kleinanzeigen_bot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ruamel.yaml import YAML
from selenium.common.exceptions import ElementClickInterceptedException, NoSuchElementException, TimeoutException, WebDriverException
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC

from . import utils, resources, extract # pylint: disable=W0406
Expand Down Expand Up @@ -423,6 +424,8 @@ def delete_ad(self, ad_cfg: dict[str, Any]) -> bool:
self.web_open(f"{self.root_url}/m-meine-anzeigen.html")
csrf_token_elem = self.web_find(By.XPATH, "//meta[@name='_csrf']")
csrf_token = csrf_token_elem.get_attribute("content")
if csrf_token is None:
raise AssertionError("Expected CSRF Token not found in HTML content!")

if self.delete_ads_by_title:
published_ads = json.loads(self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT")["content"])["ads"]
Expand Down Expand Up @@ -603,14 +606,14 @@ def publish_ad(self, ad_file:str, ad_cfg: dict[str, Any], ad_cfg_orig: dict[str,

# extract the ad id from the URL's query parameter
current_url_query_params = urllib.parse.parse_qs(urllib.parse.urlparse(self.webdriver.current_url).query)
ad_id = int(current_url_query_params.get("adId", None)[0])
ad_id = int(current_url_query_params.get("adId", [])[0])
ad_cfg_orig["id"] = ad_id

LOG.info(" -> SUCCESS: ad published with ID %s", ad_id)

utils.save_dict(ad_file, ad_cfg_orig)

def __set_category(self, ad_file:str, ad_cfg: dict[str, Any]):
def __set_category(self, ad_file:str, ad_cfg: dict[str, Any]) -> None:
# click on something to trigger automatic category detection
self.web_click(By.ID, "pstad-descrptn")

Expand Down Expand Up @@ -697,7 +700,7 @@ def __set_shipping_options(self, ad_cfg: dict[str, Any]) -> None:
except NoSuchElementException as ex:
LOG.debug(ex, exc_info = True)

def __upload_images(self, ad_cfg: dict[str, Any]):
def __upload_images(self, ad_cfg: dict[str, Any]) -> None:
LOG.info(" -> found %s", pluralize("image", ad_cfg["images"]))
image_upload = self.web_find(By.XPATH, "//input[@type='file']")

Expand Down Expand Up @@ -805,7 +808,7 @@ def download_images_from_ad_page(self, directory:str, ad_id:int, logger:logging.
n_images = 1

# determine number of images (1 ... N)
next_button = None
next_button:WebElement
try: # check if multiple images given
# edge case: 'Virtueller Rundgang' div could be found by same CSS class
element_candidates = image_box.find_elements(By.CSS_SELECTOR, '.galleryimage--info')
Expand All @@ -824,6 +827,8 @@ def download_images_from_ad_page(self, directory:str, ad_id:int, logger:logging.
dl_counter = 0
while img_nr <= n_images: # scrolling + downloading
current_img_url = img_element.get_attribute('src') # URL of the image
if current_img_url is None:
continue
file_ending = current_img_url.split('.')[-1].lower()
img_path = directory + '/' + img_fn_prefix + str(img_nr) + '.' + file_ending
if current_img_url.startswith('https'): # verify https (for Bandit linter)
Expand All @@ -850,15 +855,15 @@ def download_images_from_ad_page(self, directory:str, ad_id:int, logger:logging.

return img_paths

def extract_ad_page_info(self, directory:str, id_:int) -> dict:
def extract_ad_page_info(self, directory:str, id_:int) -> dict[str, Any]:
"""
Extracts all necessary information from an ad´s page.
:param directory: the path of the ad´s previously created directory
:param id_: the ad ID, already extracted by a calling function
:return: a dictionary with the keys as given in an ad YAML, and their respective values
"""
info = {'active': True}
info:dict[str, Any] = {'active': True}

# extract basic info
if 's-anzeige' in self.webdriver.current_url:
Expand Down Expand Up @@ -912,7 +917,7 @@ def extract_ad_page_info(self, directory:str, id_:int) -> dict:

return info

def download_ad_page(self, id_:int):
def download_ad_page(self, id_:int) -> None:
"""
Downloads an ad to a specific location, specified by config and ad ID.
NOTE: Requires that the driver session currently is on the ad page.
Expand All @@ -939,7 +944,7 @@ def download_ad_page(self, id_:int):
ad_file_path = new_base_dir + '/' + f'ad_{id_}.yaml'
utils.save_dict(ad_file_path, info)

def start_download_routine(self):
def start_download_routine(self) -> None:
"""
Determines which download mode was chosen with the arguments, and calls the specified download routine.
This downloads either all, only unsaved (new), or specific ads given by ID.
Expand Down
16 changes: 8 additions & 8 deletions kleinanzeigen_bot/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
"""
import json
from decimal import DecimalException
from typing import Any

import selenium.webdriver.support.expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webdriver import WebDriver
import selenium.webdriver.support.expected_conditions as EC

from .selenium_mixin import SeleniumMixin
from .utils import parse_decimal, pause
Expand Down Expand Up @@ -39,7 +40,7 @@ def extract_category_from_ad_page(self) -> str:

return category

def extract_special_attributes_from_ad_page(self) -> dict:
def extract_special_attributes_from_ad_page(self) -> dict[str, Any]:
"""
Extracts the special attributes from an ad page.
Expand All @@ -56,7 +57,7 @@ def extract_special_attributes_from_ad_page(self) -> dict:
special_attributes = {k: v for k, v in special_attributes.items() if not k.endswith('.versand_s')}
return special_attributes

def extract_pricing_info_from_ad_page(self) -> (float | None, str):
def extract_pricing_info_from_ad_page(self) -> tuple[float | None, str]:
"""
Extracts the pricing information (price and pricing type) from an ad page.
Expand Down Expand Up @@ -85,7 +86,7 @@ def extract_pricing_info_from_ad_page(self) -> (float | None, str):
except NoSuchElementException: # no 'commercial' ad, has no pricing box etc.
return None, 'NOT_APPLICABLE'

def extract_shipping_info_from_ad_page(self) -> (str, float | None, list | None):
def extract_shipping_info_from_ad_page(self) -> tuple[str, float | None, list[str] | None]:
"""
Extracts shipping information from an ad page.
Expand All @@ -102,9 +103,8 @@ def extract_shipping_info_from_ad_page(self) -> (str, float | None, list | None)
ship_type = 'SHIPPING'
elif '€' in shipping_text:
shipping_price_parts = shipping_text.split(' ')
shipping_price = float(parse_decimal(shipping_price_parts[-2]))
ship_type = 'SHIPPING'
ship_costs = shipping_price
ship_costs = float(parse_decimal(shipping_price_parts[-2]))

# extract shipping options
# It is only possible the extract the cheapest shipping option,
Expand Down Expand Up @@ -140,13 +140,13 @@ def extract_sell_directly_from_ad_page(self) -> bool | None:
except NoSuchElementException:
return None

def extract_contact_from_ad_page(self) -> dict:
def extract_contact_from_ad_page(self) -> dict[str, (str | None)]:
"""
Processes the address part involving street (optional), zip code + city, and phone number (optional).
:return: a dictionary containing the address parts with their corresponding values
"""
contact = {}
contact:dict[str, (str | None)] = {}
address_element = self.webdriver.find_element(By.CSS_SELECTOR, '#viewad-locality')
address_text = address_element.text.strip()
# format: e.g. (Beispiel Allee 42,) 12345 Bundesland - Stadt
Expand Down
45 changes: 28 additions & 17 deletions kleinanzeigen_bot/selenium_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,35 @@
"""
import logging, os, shutil, time
from collections.abc import Callable, Iterable
from typing import Any, Final
from typing import Any, Final, TypeVar

from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException, TimeoutException, WebDriverException
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service as ChromeService, DEFAULT_EXECUTABLE_PATH as DEFAULT_CHROMEDRIVER_PATH
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chromium.options import ChromiumOptions
from selenium.webdriver.chromium.webdriver import ChromiumDriver
from selenium.webdriver.edge.service import Service as EdgeService, DEFAULT_EXECUTABLE_PATH as DEFAULT_EDGEDRIVER_PATH
from selenium.webdriver.edge.service import Service as EdgeService
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.expected_conditions import AnyDriver
from selenium.webdriver.support.ui import Select, WebDriverWait
import selenium_stealth
import webdriver_manager.core
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.core.driver_cache import DriverCacheManager
from webdriver_manager.core.manager import DriverManager
from webdriver_manager.core.os_manager import ChromeType, OSType, OperationSystemManager
from webdriver_manager.microsoft import EdgeChromiumDriverManager
from webdriver_manager.core.utils import ChromeType, OSType

from .utils import ensure, pause, T

LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.selenium_mixin")

DEFAULT_CHROMEDRIVER_PATH = "chromedriver"
DEFAULT_EDGEDRIVER_PATH = "msedgedriver"


class BrowserConfig:

Expand All @@ -39,13 +45,16 @@ def __init__(self) -> None:
self.profile_name:str = ""


CHROMIUM_OPTIONS = TypeVar('CHROMIUM_OPTIONS', bound = ChromiumOptions) # pylint: disable=invalid-name


class SeleniumMixin:

def __init__(self) -> None:
self.browser_config:Final[BrowserConfig] = BrowserConfig()
self.webdriver:WebDriver = None

def _init_browser_options(self, browser_options:ChromiumOptions) -> ChromiumOptions:
def _init_browser_options(self, browser_options:CHROMIUM_OPTIONS) -> CHROMIUM_OPTIONS:
if self.browser_config.use_private_window:
if isinstance(browser_options, webdriver.EdgeOptions):
browser_options.add_argument("-inprivate")
Expand Down Expand Up @@ -123,8 +132,9 @@ def create_webdriver_session(self, *, use_preinstalled_webdriver:bool = True) ->
webdriver_manager.core.driver.get_browser_version_from_os = lambda _: chrome_major_version

# download and install matching chrome driver
webdriver_mgr: DriverManager
if chrome_type == ChromeType.MSEDGE:
webdriver_mgr = EdgeChromiumDriverManager(cache_valid_range = 14)
webdriver_mgr = EdgeChromiumDriverManager(cache_manager = DriverCacheManager(valid_range = 14))
webdriver_path = webdriver_mgr.install()
env = os.environ.copy()
env["MSEDGEDRIVER_TELEMETRY_OPTOUT"] = "1" # https://docs.microsoft.com/en-us/microsoft-edge/privacy-whitepaper/#microsoft-edge-driver
Expand All @@ -133,7 +143,7 @@ def create_webdriver_session(self, *, use_preinstalled_webdriver:bool = True) ->
options = self._init_browser_options(webdriver.EdgeOptions())
)
else:
webdriver_mgr = ChromeDriverManager(chrome_type = chrome_type, cache_valid_range = 14)
webdriver_mgr = ChromeDriverManager(chrome_type = chrome_type, cache_manager = DriverCacheManager(valid_range = 14))
webdriver_path = webdriver_mgr.install()
self.webdriver = webdriver.Chrome(service = ChromeService(webdriver_path), options = self._init_browser_options(webdriver.ChromeOptions()))

Expand All @@ -148,8 +158,8 @@ def create_webdriver_session(self, *, use_preinstalled_webdriver:bool = True) ->

LOG.info("New WebDriver session is: %s %s", self.webdriver.session_id, self.webdriver.command_executor._url) # pylint: disable=protected-access

def get_browser_version(self, executable_path: str) -> tuple[ChromeType, str]:
match webdriver_manager.core.utils.os_name():
def get_browser_version(self, executable_path: str) -> tuple[ChromeType, str]: # -> [ chrome_type, chrome_version ]
match OperationSystemManager.get_os_name():
case OSType.WIN:
import win32api # pylint: disable=import-outside-toplevel,import-error
# pylint: disable=no-member
Expand All @@ -175,25 +185,25 @@ def get_browser_version(self, executable_path: str) -> tuple[ChromeType, str]:
if "chromium" in filename:
return (
ChromeType.CHROMIUM,
webdriver_manager.core.utils.read_version_from_cmd(version_cmd, webdriver_manager.core.utils.PATTERN[ChromeType.CHROMIUM])
webdriver_manager.core.utils.read_version_from_cmd(version_cmd, webdriver_manager.core.os_manager.PATTERN[ChromeType.CHROMIUM])
)
if "edge" in filename:
return (
ChromeType.MSEDGE,
webdriver_manager.core.utils.read_version_from_cmd(version_cmd, webdriver_manager.core.utils.PATTERN[ChromeType.MSEDGE])
webdriver_manager.core.utils.read_version_from_cmd(version_cmd, webdriver_manager.core.os_manager.PATTERN[ChromeType.MSEDGE])
)
return (
ChromeType.GOOGLE,
webdriver_manager.core.utils.read_version_from_cmd(version_cmd, webdriver_manager.core.utils.PATTERN[ChromeType.GOOGLE])
webdriver_manager.core.utils.read_version_from_cmd(version_cmd, webdriver_manager.core.os_manager.PATTERN[ChromeType.GOOGLE])
)

def find_compatible_browser(self) -> tuple[str, ChromeType, str] | None:
match webdriver_manager.core.utils.os_name():
def find_compatible_browser(self) -> tuple[str, ChromeType, str] | None: # -> [ browser_path, chrome_type, chrome_version ]
match OperationSystemManager.get_os_name():
case OSType.LINUX:
browser_paths = [
shutil.which("chromium"),
shutil.which("chromium-browser"),
shutil.which("google-chome"),
shutil.which("google-chrome"),
shutil.which("microsoft-edge")
]

Expand Down Expand Up @@ -233,7 +243,7 @@ def find_compatible_browser(self) -> tuple[str, ChromeType, str] | None:
LOG.warning("Installed browser could not be detected")
return None

def web_await(self, condition: Callable[[WebDriver], T], timeout:float = 5, exception_on_timeout: Callable[[], Exception] | None = None) -> T:
def web_await(self, condition: Callable[[AnyDriver], T], timeout:float = 5, exception_on_timeout: Callable[[], Exception] | None = None) -> T:
"""
Blocks/waits until the given condition is met.
Expand Down Expand Up @@ -305,6 +315,7 @@ def web_input(self, selector_type:By, selector_value:str, text:str, timeout:floa
input_field.clear()
input_field.send_keys(text)
pause()
return input_field

def web_open(self, url:str, timeout:float = 15, reload_if_already_open:bool = False) -> None:
"""
Expand Down Expand Up @@ -349,7 +360,7 @@ def web_request(self, url:str, method:str = "GET", valid_response_codes:Iterable
return response
# pylint: enable=dangerous-default-value

def web_scroll_page_down(self, scroll_length: int = 10, scroll_speed: int = 10000, scroll_back_top: bool = False):
def web_scroll_page_down(self, scroll_length: int = 10, scroll_speed: int = 10000, scroll_back_top: bool = False) -> None:
"""
Smoothly scrolls the current web page down.
Expand Down
8 changes: 4 additions & 4 deletions kleinanzeigen_bot/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ def pluralize(word:str, count:int | Sized, prefix:bool = True) -> str:
'fields'
"""
if not hasattr(pluralize, "inflect"):
pluralize.inflect = inflect.engine()
pluralize.inflect = inflect.engine() # type: ignore[attr-defined] # mypy
if isinstance(count, Sized):
count = len(count)
plural:str = pluralize.inflect.plural_noun(word, count)
plural:str = pluralize.inflect.plural_noun(word, count) # type: ignore[attr-defined] # mypy
if prefix:
return f"{count} {plural}"
return plural
Expand Down Expand Up @@ -200,7 +200,7 @@ def load_dict_if_exists(filepath:str, content_label:str = "") -> dict[str, Any]
return None

with open(filepath, encoding = "utf-8") as file:
return json.load(file) if filepath.endswith(".json") else YAML().load(file)
return json.load(file) if filepath.endswith(".json") else YAML().load(file) # type: ignore[no-any-return] # mypy


def load_dict_from_module(module:ModuleType, filename:str, content_label:str = "") -> dict[str, Any]:
Expand All @@ -214,7 +214,7 @@ def load_dict_from_module(module:ModuleType, filename:str, content_label:str = "
raise ValueError(f'Unsupported file type. The file name "{filename}" must end with *.json, *.yaml, or *.yml')

content = get_resource_as_string(module, filename)
return json.loads(content) if filename.endswith(".json") else YAML().load(content)
return json.loads(content) if filename.endswith(".json") else YAML().load(content) # type: ignore[no-any-return] # mypy


def save_dict(filepath:str, content:dict[str, Any]) -> None:
Expand Down
Loading

0 comments on commit 8f9510f

Please sign in to comment.