Мультипроцессинг в Python (часть 2)


РАСПОЗНАВАНИЕ ЛИЦ С ПОМОЩЬЮ PYTHON И МНОГОПРОЦЕССОРНОЙ ОБРАБОТКИ

В этом посте мы напишем простой сценарий, чтобы проиллюстрировать, насколько удобными могут быть многопроцессорная обработка и многопоточность в реальных ситуациях.
Мы напишем простую программу, которая загружает изображения с нашего компьютера, определяет и рисует прямоугольники на всех человеческих лицах и загружает их на сервер.

Инструменты

  1. Python
  2. OpenCV
  3. Flask для нашего сервера
  4. Модуль Python requests для отправки наших изображений

НАШ АЛГОРИТМ РАСПОЗНАВАНИЯ ЛИЦ

Итак, этот код в основном обрабатывает всю локальную обработку, Чтобы понять больше о том, как работает распознавание лиц, ознакомьтесь с
realpython.com


import cv2

from requests import post

# Create the haar cascade
cascPath = "haarcascade_frontalface_default.xml"
faceCascade = cv2.CascadeClassifier(cascPath)

def draw_face(image):

    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Detect faces in the image
    faces = faceCascade.detectMultiScale(
        gray,
        scaleFactor=1.1,
        minNeighbors=5,
        minSize=(30, 30)
    )

    # Draw a rectangle around the faces
    for (x, y, w, h) in faces:
        cv2.rectangle(image, (x, y), (x+w, y+h), (0, 255, 0), 2)

    return image

def read_image(image_path):
    return cv2.imread(image_path)

def write_image(image, image_path="output.jpg"):
    fn = f"out-{image_path}"
    cv2.imwrite(fn, image)
    return fn

def upload_image(fn):
    url = "http://localhost:5000/upload"
    files = {'file': open(fn, "rb")}

    return post(url, files=files).status_code

Вход в полноэкранный режим Выход из полноэкранного режима

ПРОСТОЙ СЕРВЕР

Это простой WSGI-сервер, написанный на Flask, чтобы позволить нам загружать наши файлы.

# app.py
from flask import Flask, request

app = Flask(__name__)

@app.post("/upload")
def upload_file():

    file = request.files['file']

    file.save(f"./temp/{file.filename}")

    return {"status": True}


if __name__ == '__main__':
    app.run()

Войти в полноэкранный режим Выйти из полноэкранного режима

Запустите это в новом окне терминала

$ python app.py
Войти в полноэкранный режим Выход из полноэкранного режима

НОРМАЛЬНЫЙ СПОСОБ ОБРАБОТКИ (синхронно)

Если мы протестируем эту функцию с 1000 изображениями, запустив наш flask-сервер локально, это займет много времени, потому что он обрабатывает их синхронно.

Синхронное программирование или выполнение в основном означает, что следующий процесс должен дождаться завершения текущего процесса, прежде чем начать его.

Синхронное выполнение подходит только в том случае, если текущий процесс нуждается в данных, сгенерированных предыдущим процессом.


import time

def normal_way(image_paths):
    for image_path in image_paths:
        image = read_image(image_path)
        image = draw_face(image)
        fn = write_image(image, image_path)
        upload_image(fn)

if __name__ == '__main__':
    # Trying this 100 times to simulate 100 images
    image_paths = ["a.jpg"] * 100

    start = time.time()
    normal_way(image_paths)
    stop = time.time()

    # Time taken for synchronous: 309.1052303314209
    print(f"Time taken for synchronous: {stop-start}")
Вход в полноэкранный режим Выход из полноэкранного режима

Если бы у нас было 1000 образцов, то затраченное время составило бы примерно 3091,05 секунды.

НАШ ОПТИМИЗИРОВАННЫЙ КОД (АСИНХРОННОЕ ВЫПОЛНЕНИЕ)

Асинхронное программирование, по сути, противоположно синхронному программированию, следующий процесс может запускаться одновременно или параллельно с текущим процессом.

Существует множество способов добиться этого: Цикл событий, многопоточность, многопроцессорность и т.д. В данном учебнике мы будем использовать только многопоточность и многопроцессорность.

Мы будем использовать многопоточность для функций, основанных на IO: чтение образов с диска, запись на диск и загрузка на сервер.

Мы будем использовать многопоточность для функции, требующей больших затрат процессора, — функции, которая обнаруживает и рисует прямоугольники на лицах.


from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def optimised_solution(image_paths: list):

    # Define our pool executors
    with ThreadPoolExecutor() as thread_pool:
        with ProcessPoolExecutor() as process_pool:

            # We use threading to read our images
            images = thread_pool.map(read_image, image_paths)

            # We use multiprocessing to process our images
            processed_images = process_pool.map(draw_face,images)

            # We use multithreadfing to write our images to disk
            files = thread_pool.map(write_image, processed_images)

            # And finally, we use multithreading to push our images to our server
            responses = thread_pool.map(upload_image, files)

            print("Processed: ", all([response == 200 for response in responses]))


if __name__ == '__main__':
    # Trying this 100 times to simulate 100 images
    image_paths = ["a.jpg"] * 100

    start = time.time()
    optimised_solution(image_paths)
    stop = time.time()

    # Time taken for Optimised 1: 147.17485308647156
    print(f"Time taken for Optimised 1: {stop-start}")
Вход в полноэкранный режим Выход из полноэкранного режима

ОСНОВНЫЕ ВЫВОДЫ

  • Обратите внимание, что наше решение, оптимизированное с помощью мультипроцессинга, заняло примерно в два раза меньше времени для завершения выполнения.

  • Наша программа теперь имеет возможность горизонтального масштабирования с увеличением количества процессоров благодаря концепции парралельных вычислений, рассмотренной в части 1.

  • Если бы мы использовали асинхронный сервер (ASGI), наша программа была бы более оптимизирована, но давайте оставим это на другой день.

ПОЛНЫЙ КОД

import cv2
import time

from requests import post

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor



cascPath = "haarcascade_frontalface_default.xml"

# Create the haar cascade
faceCascade = cv2.CascadeClassifier(cascPath)

def draw_face(image):
    # More Info: https://realpython.com/face-recognition-with-python/

    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Detect faces in the image
    faces = faceCascade.detectMultiScale(
        gray,
        scaleFactor=1.1,
        minNeighbors=5,
        minSize=(30, 30)
    )

    # Draw a rectangle around the faces
    for (x, y, w, h) in faces:
        cv2.rectangle(image, (x, y), (x+w, y+h), (0, 255, 0), 2)

    return image

def read_image(image_path):
    return cv2.imread(image_path)

def write_image(image, image_path="output.jpg"):
    fn = f"out-{image_path}"
    cv2.imwrite(fn, image)
    return fn

def upload_image(fn):
    url = "http://localhost:5000/upload"
    files = {'file': open(fn, "rb")}

    return post(url, files=files).status_code


def normal_way(image_paths):
    for image_path in image_paths:
        image = read_image(image_path)
        image = draw_face(image)
        fn = write_image(image, image_path)
        upload_image(fn)

def optimised_solution(image_paths: list):

    # Define our pool executors
    with ThreadPoolExecutor() as thread_pool:
        with ProcessPoolExecutor() as process_pool:

            # We use threading to read our images
            images = thread_pool.map(read_image, image_paths)

            # We use multiprocessing to process our images
            processed_images = process_pool.map(draw_face,images)

            # We use multithreadfing to write our images to disk
            files = thread_pool.map(write_image, processed_images)

            # And finally, we use multithreading to push our images to our server
            responses = thread_pool.map(upload_image, files)

            print("Processed: ", all([response == 200 for response in responses]))


if __name__ == '__main__':
    # Trying this 100 times to simulate 100 images
    image_paths = ["a.jpg"] * 100

    start = time.time()
    normal_way(image_paths)
    stop = time.time()

    # Time taken for synchronous: 309.1052303314209
    print(f"Time taken for synchronous: {stop-start}")

    start = time.time()
    optimised_solution(image_paths)
    stop = time.time()

    # Time taken for Optimised 1: 147.17485308647156
    print(f"Time taken for Optimised 1: {stop-start}")

Вход в полноэкранный режим Выход из полноэкранного режима

ЗАКЛЮЧЕНИЕ

  • Многопроцессорность и многопоточность могут быть объединены, чтобы значительно повысить производительность нашего кода.

  • Синхронное выполнение может масштабироваться только при наличии более быстрых процессоров, но многопоточность позволяет нам масштабироваться с большим количеством процессоров.

Не стесняйтесь задавать любые вопросы в разделе комментариев или обращайтесь ко мне по адресу ikabolo59@gmail.com и в Twitter.

Спасибо

Оцените статью
devanswers.ru
Добавить комментарий