Мультипроцессинг в Python (часть 1)

Многие из вас наверняка бывали в ситуации, когда вам нужно выполнять несколько задач или повторяющиеся действия над несколькими предметами, например, делать домашнее задание или даже такую мелочь, как стирка. Намного проще, когда у нас есть возможность делать несколько дел одновременно. Например, иметь несколько стиральных машин для стирки белья, или 5 человек делают домашнюю работу.

Тот же принцип применим и к вычислениям. Бывают случаи, когда у нас много данных, и мы хотели бы выполнить одно и то же действие со всеми данными. Проблема в том, что это одно и то же действие, а у нас много данных. Это замедляет работу нашей программы.

import time

def our_function():
    print("Processing stuff...")
    time.sleep(5)
    print("Done")

def normal_linear_method():

    our_function()
    our_function()
    our_function()

normal_linear_method()
# Time taken: about 15 seconds
Вход в полноэкранный режим Выход из полноэкранного режима

Предположим, что для выполнения действия или функции над данными требуется ровно 5 секунд. Если нам нужно обработать 100 единиц данных, это займет у нас 500 секунд, то есть около 8 минут нашего времени. Что, если я скажу вам, что есть способ ускорить процесс с 8 минут до 5 секунд?

Многопоточность в Python

Первая техника, которую мы будем использовать для решения нашей проблемы, называется многопоточностью. Многопоточность работает путем постоянного переключения контекста (по сути, состояния задачи, над которой она работает в данный момент) таким образом, что достигается иллюзия параллельной обработки. Эта концепция также известна как параллелизм.

# Example of task speed up using multithreading

from threading import Thread
import time

def using_multithreading():

    # Our threads
    t1  = Thread(target=our_function)
    t2 = Thread(target=our_function)
    t3 = Thread(target=our_function)

    # Starting our threads
    t1.start()
    t2.start()
    t3.start()

    # We join the threads/processes so our main thread/process
    # can wait for it to be completed before terminating

    t1.join()
    t2.join()
    t3.join()

using_multithreading()
# time taken: about 5 seconds
Вход в полноэкранный режим Выход из полноэкранного режима

Мультипроцессинг в Python

Вторая техника, которую мы будем использовать для решения нашей проблемы, — это многопоточность. В то время как многопоточность в python использует переключение контекста, многопроцессорность в python запускает каждый из процессов параллельно. Каждый процесс имеет свою собственную копию памяти всей программы и работает на своем собственном ядре.

# Example of task speed up using multiprocessing
import time
from multiprocessing import Process

def using_multiprocessing():
    # Our processes
    p1  = Process(target=our_function)
    p2 = Process(target=our_function)

    # Starting our processes
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':

    start = time.perf_counter()
    using_multiprocessing()
    stop = time.perf_counter()

    print("Time taken {}".format(stop-start))
Вход в полноэкранный режим Выход из полноэкранного режима

Многопроцессорная обработка против многопоточности: Параллелизм и параллелизм

И многопроцессорность, и многопоточность могут пригодиться. Вопрос в том, когда и что использовать.

  • Мы используем многопоточность для операций, связанных с вводом-выводом, таких как чтение данных из файла или объединение данных с сервера.
  • Мы используем многопоточность для операций, привязанных к процессору, таких как обработка изображений, обучение модели машинного обучения, обработка больших данных и т.д.

Запуск нескольких процессов одновременно

Бывают случаи, когда мы хотим запустить функцию на последовательности данных. Скажем, у нас есть список из 100 единиц данных, и мы хотим применить нашу функцию ко всем из них параллельно или одновременно. Мы можем использовать различные подходы:

Подход 1: итеративно создавать процессы и запускать их

В этом подходе мы использовали цикл, чтобы создать процесс для всех наших данных и запустить их. Проблема этого подхода заключается в том, что мы не можем легко получить выходные данные процессов.

import time
from multiprocessing import Process

def multiple_processes():

    # Spawn our processes iteratively
    processes = [
        Process(target=operation, args=(x,)) 
        for x in data
    ]

    for process in processes:
        # Iteratively start all processes
        process.start()

    for process in processes:
        process.join()

    return 

if __name__ == '__main__':

    start = time.perf_counter()
    multiple_processes()
    stop = time.perf_counter()

    print("Time taken {}".format(stop-start))
    # time taken: about 8 seconds
Вход в полноэкранный режим Выход из полноэкранного режима

Подход 2: Исполнитель ProcessPoolExecutor

В этом подходе мы используем так называемый пул, который является более простым и аккуратным способом управления вычислительными ресурсами. Хотя этот способ медленнее, чем итеративное порождение процессов, он более аккуратен и позволяет нам использовать вывод этих процессов в нашем основном процессе.

# Using multiprocessing with ProcessPoolExecutor
import time
from concurrent.futures import 
    ProcessPoolExecutor, as_completed


def multiple_processes_pooling():

    with ProcessPoolExecutor() as executor:
        process_futures = [
            executor.submit(operation, x) 
            for x in data
        ]
        results = [
            p.result() 
            for p in 
            as_completed(process_futures)
        ]

        print(results)


if __name__ == '__main__':

    start = time.perf_counter()
    multiple_processes_pooling()
    stop = time.perf_counter()

    print("Time taken {}".format(stop-start))
    # time taken: about 50 seconds
Вход в полноэкранный режим Выход из полноэкранного режима

Подход 3: ProcessPoolExecutor().map

В этом подходе вместо итеративной отправки процесса исполнителю нашего пула мы использовали метод executor.map для отправки всех данных в списке сразу. Выходом этой функции является результат всех завершенных процессов.

import time
from concurrent.futures import ProcessPoolExecutor

# Using the executor.map
def pooling_map():

    with ProcessPoolExecutor() as executor:
        results = executor.map(operation, data)

        print([res for res in results])

if __name__ == '__main__':

    start = time.perf_counter()
    pooling_map()
    stop = time.perf_counter()

    print("Time taken {}".format(stop-start))
    # time taken: about 50 seconds
Вход в полноэкранный режим Выход из полноэкранного режима

Очень важно помнить

Если вы посмотрите на вывод времени, то заметите, что затраченное время не является точной единицей времени, на это влияют четыре основных фактора.

  • Используемый компьютер может влиять на время, а также другие программы, запущенные на вашем ПК. Код был протестирован с использованием компьютера intel Core i5 7-го поколения.

  • Нашей программе требуется несколько микросекунд, чтобы правильно настроить процессы и запустить их.

  • Когда процессов больше, чем ядер процессора, наша система автоматически ставит в очередь ожидающие процессы и помогает нам правильно ими управлять.

  • И, наконец, нашей программе требуется несколько микросекунд, чтобы правильно закрыть процессы.

Учитывая это, важно отметить, что мы используем многопроцессорную обработку только в тех случаях, когда данных много и на выполнение операции требуется много времени.

Заключение

  • Многопроцессорность и многопоточность помогают нам ускорить работу наших программ.

  • Многопроцессорная обработка лучше всего подходит для операций, связанных с процессором, таких как машинное обучение и обработка данных.

  • Многопоточность больше всего подходит для операций, связанных с IO, таких как взаимодействие с серверами или файловой системой.

  • Многопоточность — это не волшебная палочка; не используйте ее без необходимости, иначе она может замедлить работу вашего кода.

Оцените статью
devanswers.ru
Добавить комментарий