Адаптированный перевод статьи Building a Concurrent Web Scraper with Python and Selenium.

В этом посте мы разберем вопрос, как ускорить выполнение скрипта Python при парсинге информации с веб-страниц, используя модуль concurrent.futures для реализации многопоточности его выполнения. Также мы разберем сам сценарий и покажем, как проверить корректность его работы с помощью pytest.

После прочтения этой статьи вы сможете:

  1. Собирать информацию и перемещаться по страницам веб-сайтов с помощью Selenium и Beautiful Soup
  2. Настроить pytest для проверки функциональности процедур вашего кода.
  3. Запускать парсер для параллельного выполнения запросов, используя модуль concurrent.futures.
  4. Настроить драйвер ChromeDriver Selenium для работы в headless режиме работы.

Настройка проекта

Сначала клонируем репозиторий с кодом нашего проекта. Для этого из командной строки выполним следующие команды:

$ git clone git@github.com:testdrivenio/concurrent-web-scraping.git
$ cd concurent-web-scraping
$ python3.8 -m venv env
$ source env/bin/activate
(env)$ pip install -r requirements.txt

Приведенные выше команды могут отличаться в зависимости от вашей среды разработки.

Установите ChromeDriver глобально. (Мы же используем версию 85.0.4183.87).

Разбор работы скрипта

Скрипт перебирает и собирает информацию с первых 20 страниц сайта Hacker News о последних статьях, используя Selenium для автоматизации взаимодействия с сайтом и Beautiful Soup для анализа HTML.

import datetime
import sys
from time import sleep, time

from scrapers.scraper import connect_to_base, get_driver, parse_html, write_to_file


def run_process(page_number, filename, browser):
    if connect_to_base(browser, page_number):
        sleep(2)
        html = browser.page_source
        output_list = parse_html(html)
        write_to_file(output_list, filename)
    else:
        print("Error connecting to hacker news")


if __name__ == "__main__":

    # используется headless mod?
    headless = False
    if len(sys.argv) > 1:
        if sys.argv[1] == "headless":
            print("Running in headless mode")
            headless = True

    # устанавливаем значения вспомоготельных переменных
    start_time = time()
    current_page = 1
    output_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    output_filename = f"output_{output_timestamp}.csv"

    # инициализируем веб драйвер
    browser = get_driver(headless=headless)

    # перебираем страницы и собираем нужную информацию
    while current_page <= 20:
        print(f"Scraping page #{current_page}...")
        run_process(current_page, output_filename, browser)
        current_page = current_page + 1

    # завершаем работу скрипта
    browser.quit()
    end_time = time()
    elapsed_time = end_time - start_time
    print(f"Elapsed run time: {elapsed_time} seconds")

Начнем с основного файла. Вначале определим функцию run_process в которой будет запущен основной цикл работы нашего скрипта. Затем проверяем будет ли ChromeDriver работать в headless режиме, после чего определим и инициализируем несколько переменных, а также инициализируем сам драйвер с помощью метода get_driver(), импортированного из файла scrapers/scraper.py. В цикле while перебираем страницы сайта с которых будем собирать нужные нам данные. Для этого вызывается функция run_process(), которая непосредственно управляет соединением WebDriver и вызовами, соответствующих функций для сбора информации со страниц. Для этого в run_process() передается экземпляр WebDriver, а также номер страницы, которые передаются в функцию connect_to_base(), которая импортируется из файла scraper.py, код которого приводится ниже.

import csv
from pathlib import Path

import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait

BASE_DIR = Path(__file__).resolve(strict=True).parent.parent

def get_driver(headless):
    options = webdriver.ChromeOptions()
    if headless:
        options.add_argument("--headless")

    # инициализируем драйвер с нужными опциями
    driver = webdriver.Chrome(chrome_options=options)
    return driver


def connect_to_base(browser, page_number):
    base_url = f"https://news.ycombinator.com/news?p={page_number}"
    connection_attempts = 0
    while connection_attempts < 3:
        try:
            browser.get(base_url)
            # ожидаем пока элемент table с id = 'hnmain' будет загружен на страницу
            # затем функция вернет True иначе False 
            WebDriverWait(browser, 5).until(
                EC.presence_of_element_located((By.ID, "hnmain"))
            )
            return True
        except Exception as e:
            print(e)
            connection_attempts += 1
            print(f"Error connecting to {base_url}.")
            print(f"Attempt #{connection_attempts}.")
    return False


def parse_html(html):
    # создадим новый объект soup
    soup = BeautifulSoup(html, "html.parser")
    output_list = []
    # ищем в объекте  soup object id, rank, score и title статьи
    tr_blocks = soup.find_all("tr", class_="athing")
    article = 0
    for tr in tr_blocks:
        article_id = tr.get("id")
        article_url = tr.find_all("a")[1]["href"]
        # определяем, что статья новая
        if "item?id=" in article_url:
            article_url = f"https://news.ycombinator.com/{article_url}"
        load_time = get_load_time(article_url)
        try:
            score = soup.find(id=f"score_{article_id}").string
        except Exception as e:
            print(e)
            score = "0 points"
        article_info = {
            "id": article_id,
            "load_time": load_time,
            "rank": tr.span.string,
            "score": score,
            "title": tr.find(class_="storylink").string,
            "url": article_url,
        }
        # добавляем информацию о статье в список
        output_list.append(article_info)
        article += 1
    return output_list


def get_load_time(article_url):
    try:
        # устанавливаем значения заголовков запроса
        headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
        }
        # делаем запрос по url статьи article_url
        response = requests.get(
            article_url, headers=headers, stream=True, timeout=3.000
        )
        # получаем время загрузки страницы
        load_time = response.elapsed.total_seconds()
    except Exception as e:
        print(e)
        load_time = "Loading Error"
    return load_time


def write_to_file(output_list, filename):
    for row in output_list:
        with open(Path(BASE_DIR).joinpath(filename), "a") as csvfile:
            fieldnames = ["id", "load_time", "rank", "score", "title", "url"]
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writerow(row)

Функция connect_to_base() пытается подключиться к сайту Hacker News, а затем использует функционал явного ожидания explicit wait Selenium, чтобы убедиться, что элемент с id = 'hnmain' загружен на страницу, прежде чем продолжить и получить из него нужные нам данные.

Просмотрите документацию Selenium для получения дополнительной информации о порядке использования explicit wait.

if connect_to_base(browser, page_number):
        ########
        # здесь может быть код имитирующие поведение пользователя на странице #
        ########
        sleep(2)
        html = browser.page_source
        output_list = parse_html(html)
        write_to_file(output_list, filename)
    else:
        print("Error connecting to hacker news")

Чтобы подражать человеку-пользователю, вызывается метод sleep(2) который вносит задержку на 2 секунды после того, как драйвер подключился к Hacker News. После загрузки страницы и выполнения sleep(2), драйвер захватывает HTML код страницы, который затем передается в функцию parse_html().

def parse_html(html):
    # создадим новый объект soup
    soup = BeautifulSoup(html, "html.parser")
    output_list = []
    # ищем в объекте  soup object id, rank, score и title статьи
    tr_blocks = soup.find_all("tr", class_="athing")
    article = 0
    for tr in tr_blocks:
        article_id = tr.get("id")
        article_url = tr.find_all("a")[1]["href"]
        # определяем, что статья новая
        if "item?id=" in article_url:
            article_url = f"https://news.ycombinator.com/{article_url}"
        load_time = get_load_time(article_url)
        try:
            score = soup.find(id=f"score_{article_id}").string
        except Exception as e:
            print(e)
            score = "0 points"
        article_info = {
            "id": article_id,
            "load_time": load_time,
            "rank": tr.span.string,
            "score": score,
            "title": tr.find(class_="storylink").string,
            "url": article_url,
        }
        # добавляем информацию о статье в список
        output_list.append(article_info)
        article += 1
    return output_list

В свою очередь функция parse_html() использует возможности библиотеки Beautiful Soup для синтаксического разбора HTML кода страницы для сбора нужной информации со страниц, которая будет помещена в список dicts. Эта функция также передает URL адрес статьи в функцию get_load_time().

def get_load_time(article_url):
    try:
        # устанавливаем значения заголовков запроса
        headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
        }
        # делаем запрос по url статьи article_url
        response = requests.get(
            article_url, headers=headers, stream=True, timeout=3.000
        )
        # получаем время загрузки страницы
        load_time = response.elapsed.total_seconds()
    except Exception as e:
        print(e)
        load_time = "Loading Error"
    return load_time

Эта функция загружает страницу статьи по переданному в нее URL адресу article_url, а также запоминает время ее загрузки.

Полученная информация о статье добавляется в CSV файл после передачи списка с полученными данными о статьях в функцию write_to_file.

def write_to_file(output_list, filename):
    for row in output_list:
        with open(Path(BASE_DIR).joinpath(filename), "a") as csvfile:
            fieldnames = ["id", "load_time", "rank", "score", "title", "url"]
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writerow(row)

После запуска скрипта нам потребовалось ожидать около 355 секунд (почти 6 минут) до окончания его работы:

(env)$ python script.py

Scraping page #1...
Scraping page #2...
Scraping page #3...
Scraping page #4...
Scraping page #5...
Scraping page #6...
Scraping page #7...
Scraping page #8...
Scraping page #9...
Scraping page #10...
Scraping page #11...
Scraping page #12...
Scraping page #13...
Scraping page #14...
Scraping page #15...
Scraping page #16...
Scraping page #17...
Scraping page #18...
Scraping page #19...
Scraping page #20...
Elapsed run time: 385.49500608444214 seconds

Имейте в виду, что контент может быть не на всех 20 страницах, поэтому прошедшее время может отличаться в вашем случае. Когда этот скрипт запускался, контент присутствовал на 18 страницах (около 530 записей).

Это достаточно большое время. Ну что ж, сначала добавим к нашему коду возможность базового тестирования.

Тестируем наш код

С целью проверить функциональность нашего парсера без запуска браузера и, таким образом, не повторять GET запросы к сайту Hacker News, вы можете загрузить HTML код страницы и сохранить его в папку test/test.html, а затем парсить его локальную копию. Это поможет избежать блокировки вашего IP-адреса из-за слишком быстрого выполнения большого количества запросов, в ходе отладки и тестирования функций парсинга данных, этот подход также сэкономит ваше время, поскольку вам не нужно запускать браузер при каждом запуске скрипта.

Ниже представлен код файла тестов, который находится в папке test/test_scraper.py:

from pathlib import Path

import pytest

from scrapers import scraper

BASE_DIR = Path(__file__).resolve(strict=True).parent


@pytest.fixture(scope="module")
def html_output():
    with open(Path(BASE_DIR).joinpath("test.html"), encoding="utf-8") as f:
        html = f.read()
        yield scraper.parse_html(html)


def test_output_is_not_none(html_output):
    assert html_output


def test_output_is_a_list(html_output):
    assert isinstance(html_output, list)


def test_output_is_a_list_of_dicts(html_output):
    assert all(isinstance(elem, dict) for elem in html_output)

Убедимся, что все работает как надо:

(env)$ python -m pytest test/test_scraper.py

================================ test session starts =================================
platform darwin -- Python 3.8.5, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /Users/michael/repos/testdriven/async-web-scraping
collected 3 items

test/test_scraper.py ...                                                       [100%]

================================= 3 passed in 20.10s =================================

Код выполнялся всего 20 секунд. Попробуем имитировать работу функции get_load_time(), исключив отправку GET запроса.

test / test_scraper_mock.py:

from pathlib import Path

import pytest

from scrapers import scraper

BASE_DIR = Path(__file__).resolve(strict=True).parent


@pytest.fixture(scope="function")
def html_output(monkeypatch):
    def mock_get_load_time(url):
        return "mocked!"

    monkeypatch.setattr(scraper, "get_load_time", mock_get_load_time)
    with open(Path(BASE_DIR).joinpath("test.html"), encoding="utf-8") as f:
        html = f.read()
        yield scraper.parse_html(html)


def test_output_is_not_none(html_output):
    assert html_output


def test_output_is_a_list(html_output):
    assert isinstance(html_output, list)


def test_output_is_a_list_of_dicts(html_output):
    assert all(isinstance(elem, dict) for elem in html_output)

Снова тестируем:

(env)$ python -m pytest test/test_scraper_mock.py

================================ test session starts =================================
platform darwin -- Python 3.8.5, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /Users/michael/repos/testdriven/async-web-scraping
collected 3 items

test/test_scraper.py ...                                                       [100%]

================================= 3 passed in 0.37s =================================

Настраиваем многопоточность

А теперь самое интересное! Внеся всего лишь несколько изменений в код нашего сценария, мы можем ускорить процесс его выполнения:

import datetime
import sys
from concurrent.futures import ThreadPoolExecutor, wait
from time import sleep, time

from scrapers.scraper import connect_to_base, get_driver, parse_html, write_to_file


def run_process(page_number, filename, headless):

    # инициализируем веб драйвер
    browser = get_driver(headless)

    if connect_to_base(browser, page_number):
        sleep(2)
        html = browser.page_source
        output_list = parse_html(html)
        write_to_file(output_list, filename)

        # закрываем соединие
        browser.quit()
    else:
        print("Error connecting to hacker news")
        browser.quit()


if __name__ == "__main__":

    # устанавливаем headless mode?
    headless = False
    if len(sys.argv) > 1:
        if sys.argv[1] == "headless":
            print("Running in headless mode")
            headless = True

    # инициализируем переменные
    start_time = time()
    output_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    output_filename = f"output_{output_timestamp}.csv"
    futures = []

    # перебираем страницы и получаем информацию о них
    with ThreadPoolExecutor() as executor:
        for number in range(1, 21):
            futures.append(
                executor.submit(run_process, number, output_filename, headless)
            )

    wait(futures)
    end_time = time()
    elapsed_time = end_time - start_time
    print(f"Elapsed run time: {elapsed_time} seconds")

В модуле concurrent.futures класс ThreadPoolExecutor используется для создания пула потоков и асинхронного выполнения вызовов функций run_process(). Метод submit принимает функцию вместе с аргументами, передаваемыми при ее вызове, и возвращает результат выполнения функции run_process. Метод wait используется для блокировки запущенного асинхронно кода до завершения всех выполняемых им задач.

Стоит отметить, что вы можете легко использовать в вашем коде многопроцессорность, используя класс ProcessPoolExecutor, поскольку и ProcessPoolExecutor, и ThreadPoolExecutor реализуют для один и тот же интерфейс:

# перебираем страницы и получаем информацию о них
with ProcessPoolExecutor() as executor:
    for number in range(1, 21):
        futures.append(
            executor.submit(run_process, number, output_filename, headless)
        )

Почему в нашем примере мы используем многопоточность вместо многопроцессорности?

Скарпинг веб-страниц в большей степени связан с выполнением операций ввода-вывода I/O, поскольку получение по сети HTML кода (I/O) происходит медленнее, чем непосредственно его парсинг (ЦП). Чтобы узнать об этом больше, а также о разнице между параллелизмом parallelism (многопроцессорностью) и параллелизмом concurrency (многопоточностью), ознакомьтесь со статьей Speeding Up Python with Concurrency, Parallelism, and asyncio.

Запустим наш усовершенствованный парсер:

(env)$ python script_concurrent.py

Elapsed run time: 38.73605298995972 seconds

С кодом нашего парсера, работающего в многопоточном режиме, можно ознакомиться по ссылке.

Чтобы еще более ускорить процесс, мы можем запустить Chrome в headless режиме, передав в качестве аргумента в командной строке значение headless:

(env)$ python script_concurrent.py headless

Running in headless mode

Elapsed run time: 35.12011382590508 seconds

Заключение

Немного модифицировав исходный код нашего веб-скарпера мы смогли распараллелить его работу, что позволило сократить время выполнения сценария от 385 секунд до чуть более 35 секунд. В нашем случае этот подход позволил увеличить быстродействие скрипта на 90%, что является достаточно эффективным решением.

Надеюсь, материалы этой статьи помогут вам в вашей работе.

Оставить комментарий