В этой статье мы изучим способы как сократить время обработки большого файла с данными, используя модуль multiprocessing, библиотеку joblib и средств многопроцессорной обработки модуля tqdm.

Обычно для параллельной обработки задачу на одинаковые по объему подзадачи. И хотя такой подход приводит к необходимости управлять их запуском, сохранять результаты и следить за состоянием, он существенно сокращает общее время работы программы.

Например, вы работаете с большим CSV файлом. И нужно изменить в нем один столбец, а затем сохранить результат.

Прочитав файл, мы передаём значения столбца в виде массива в функцию для обработки. В зависимости от количества доступных воркеров workers, запустим параллельно несколько процессов для ее выполнения. При этом число воркеров и соответственно запущенных процессов определяется количеством ядер процессора.

Примечание: использование параллельной обработки для небольшого набора данных улучшит время обработки очень незначительно. А время работы скриптов может варьировать от машины к машине.

Начало

Для примера мы будем использовать набор табличных данных US Accidents (2016 — 2021) от Kaggle, который состоит из 2,8 миллиона записей и 47 столбцов.

Сначала импортируем следующие модули.

  • multiprocessing, joblib и tqdm, которые будем использовать непосредственно для обработки данных parallel processing.
  • pandas для ввода данных data ingestions.
  • re, nltk и string для работы с текстом text processing.
# Модули для параллельной обработки Parallel processing
import multiprocessing as mp
from joblib import Parallel, delayed
from tqdm.notebook import tqdm

# Модули для ввода данных Data ingestions 
import pandas as pd

# Модули для обработки текстовых данных Text Processing 
import re 
from nltk.corpus import stopwords
import string

Прежде чем мы начнем необходимо определить число воркеров n_workers. Для этого используем функцию cpu_count() из модуля multiprocessing.

Функция cpu_count возвращает целочисленное значение соответствующее количеству ядер процессора на вашей машине. Если процессор поддерживает технологию Hyper-Threading, то функция вернёт количество логических ядер (процессоров). В противном случае функция вернет None.

n_workers = mp.cpu_count()
print(f"{n_workers} workers are available")

>>> 8 workers are available

На следующем шаге прочитаем исходный большой CSV файл в в объект dataframe, используя функцию pandas read_csv. Выведем в консоли размер полученного объекта, названия его столбцов и продолжительность операции чтения файла.

Примечание. Используем магическую команду Jupyter %%time, которая выведет время компиляции и выполнения кода ячейки.

%%time
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(file_name)

print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")

Этот код выведет в консоли следующее.

Shape:(2845342, 47)

Column Names:

Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng',
'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street',
'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
'Astronomical_Twilight'],
dtype='object')

CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s
Wall time: 46.9 s

Основная функция очистки текста

Напишем функцию clean_text, которая в качестве аргумента будет принимать текст, а затем очищать его от стоп-слов и пробельных символов.

Сначала мы получим английские стоп-слова, используя модуль nltk.copus, чтобы отфильтровать их из исходного текста. Потом удалим из него специальные символы и лишние пробелы.

Это будет основная (рабочая) функция, по времени выполнения которой мы будем сравнивать последовательный serial, параллельный parallel и пакетный batch способы обработки нашего файла.

def clean_text(text): 
  # Убираем стоп-слова
  stops = stopwords.words("english")
  text = " ".join([word for word in text.split() if word not in stops])
  # Убираем специальные символы
  text = text.translate(str.maketrans('', '', string.punctuation))
  # Убираем лишние пробельные символы
  text = re.sub(' +',' ', text)
  return text

Последовательная обработка Serial Processing

Для последовательной обработки используем функцию pandas.apply(). Если вы хотите увидеть индикатор выполнения в консоли, нужно инициализировать модуль tqdm для pandas, а затем использовать его функцию progress_apply().

И так мы собираемся обработать 2,8 миллиона записей и сохранить результат обратно в столбец Description.

%%time
tqdm.pandas()

df['Description'] = df['Description'].progress_apply(clean_text)

Достаточно высокопроизводительному процессору потребовалось 9 минут и 5 секунд для последовательной обработки исходного файла.

Вывод в консоли:

100% 🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩 2845342/2845342 [09:05<00:00, 5724.25it/s]

CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s
Wall time: 9min 5s

Используем модуль multiprocessing

Рассмотрим способ обработки больших файлов с помощью модуля multiprocessing. Это встроенный модуль Python, который часто используется для решения подобных задач.

Сначала инициализируем многопроцессорный пул с 8 воркерами, а затем воспользуемся функцией map, чтобы организовать сам процесс обработки. Для отображения индикатора прогресса снова используем модуль tqdm.

Функция map принимает два аргумента. В качестве первого передается функция, а второго — список (массив) обрабатываемых значений.

%%time
p = mp.Pool(n_workers) 

df['Description'] = p.map(clean_text, tqdm(df['Description']))

Вывод в консоли:

100% 🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩 2845342/2845342 [02:58<00:00, 135646.12it/s]

CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s
Wall time: 3min 51s

Как видим время работы скрипта сократилось почти в 3 раза с 9 минут 5 секунд до 3 минут 51 секунды.

Параллельная обработка Parallel processing

Для параллельной обработки файлов можно также использовать класс Parallel и функцию delayed из библиотеки joblib.

Модуль joblib — это набор инструментов для упрощения конвейерной обработки данных в Python. В нем реализовано кэширование операций с данными на диске, встроен свой механизм отслеживания изменений (паттерн memoize), а также реализованы средства для параллельной обработки.

Следующий пример кода позволяет решить задачу нашу задачу с использованием библиотеки joblib.

def text_parallel_clean(array):
  result = Parallel(n_jobs=n_workers, backend="multiprocessing")
  (delayed(clean_text)(text) for text in tqdm(array))
  return result

Разберем, что делает функция text_parallel_clean, которая при вызове принимает массив значений столбца Description.

  1. Инициализируется новый экземпляр класса Parallel с передачей двух аргументов: n_jobs = 8 и backend = multiprocessing.
  2. В функцию delayed передаем рабочую функцию clean_text.
  3. Создаем генератор списка с выбором по одному значений столбца Description.

Запустим наш код в блокноте.

%%time
df['Description'] = text_parallel_clean(df['Description'])

Вывод в консоли:

И так обработка файла заняла у нас на 13 секунд больше, чем предыдущий вариант с многопроцессорным пулом. Но в любом случае использование класса Parallel на 4 минуты 59 секунд быстрее, чем последовательная обработка содержимого файла.

100% 🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩 2845342/2845342 [04:03<00:00, 10514.98it/s]

CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s
Wall time: 4min 4s

Параллельная пакетная обработка Parallel Batch Processing

Еще один способ работы с большими файлами, разбивать их на пакеты (отдельные части) batches и обрабатывать каждый параллельно.

Начнем с функции, которая разделит наш файл на несколько частей по числу воркеров. В нашем случае получаем 8 пакетов.

def batch_file(array,n_workers):
  file_len = len(array)
  batch_size = round(file_len / n_workers)
  batches = [array[ix:ix + batch_size] for ix in tqdm(range(0, file_len, batch_size))]
  return batches

Следующая функция будет запускать основную функцию очистки clean_text для каждого пакета данных.

def proc_batch(batch):
  return [clean_text(text) for text in batches]

Далее мы будем использовать класс Parallel и функцию delayed, но теперь для пакетной обработки.

%%time
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")
(delayed(proc_batch)(batch) for batch in tqdm(batches))

df['Description'] = [j for i in batch_output for j in i]

Вывод в консоли:

И так, мы еще немного улучшили время обработки нашего файла. Такой способ часто используется для обработки сложных данных и работы с моделями глубокого обучения.

100% 🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩 8/8 [00:00<00:00, 2.19it/s]

CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s
Wall time: 3min 56s

Используем модуль tqdm

Модуль также tqdm позволяет решать задачи многопроцессорной обработки просто и эффективно. И именно его я рекомендую прежде всего.

Ознакомьтесь с документацией этого модуля, чтобы узнать больше о реализации многопроцессорной обработки с помощью функции process_map.

Для ее вызова требуется:

  1. Наименование основной функции обработки (в нашем случае это clean_text).
  2. Массив обрабатываемых данных (значения из столбца Description).
  3. Число воркеров max_workers.
  4. Размер (длинна) одного пакета данных chucksize.

Но сначала предварительно рассчитаем длину отдельного пакета данных, используя число воркеров.

%%time
from tqdm.contrib.concurrent import process_map
batch = round(len(df)/n_workers)

df['Description'] = process_map(clean_text, df['Description'], max_workers=n_workers, chunksize=batch)

Вывод в консоли:

С помощью только одной строки кода мы получаем лучший результат.

100% 🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩 2845342/2845342 [03:48<00:00, 1426320.93it/s]

CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s
Wall time: 3min 51s

Заключение

Часто нужно найти баланс и выбрать наиболее рациональный способ для решения задачи: последовательная, параллельная или пакетная обработка данных. При этом параллельная обработка может иметь неприятные последствия, если входные данные имеют малый размер или имеют разные уровни вложенности.

Оставить комментарий