РАСПОЗНАВАНИЕ ЛИЦ С ПОМОЩЬЮ PYTHON И МНОГОПРОЦЕССОРНОЙ ОБРАБОТКИ
В этом посте мы напишем простой сценарий, чтобы проиллюстрировать, насколько удобными могут быть многопроцессорная обработка и многопоточность в реальных ситуациях.
Мы напишем простую программу, которая загружает изображения с нашего компьютера, определяет и рисует прямоугольники на всех человеческих лицах и загружает их на сервер.
Инструменты
- Python
- OpenCV
- Flask для нашего сервера
- Модуль Python
requests
для отправки наших изображений
НАШ АЛГОРИТМ РАСПОЗНАВАНИЯ ЛИЦ
Итак, этот код в основном обрабатывает всю локальную обработку, Чтобы понять больше о том, как работает распознавание лиц, ознакомьтесь с
realpython.com
import cv2
from requests import post
# Create the haar cascade
cascPath = "haarcascade_frontalface_default.xml"
faceCascade = cv2.CascadeClassifier(cascPath)
def draw_face(image):
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Detect faces in the image
faces = faceCascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30)
)
# Draw a rectangle around the faces
for (x, y, w, h) in faces:
cv2.rectangle(image, (x, y), (x+w, y+h), (0, 255, 0), 2)
return image
def read_image(image_path):
return cv2.imread(image_path)
def write_image(image, image_path="output.jpg"):
fn = f"out-{image_path}"
cv2.imwrite(fn, image)
return fn
def upload_image(fn):
url = "http://localhost:5000/upload"
files = {'file': open(fn, "rb")}
return post(url, files=files).status_code
ПРОСТОЙ СЕРВЕР
Это простой WSGI-сервер, написанный на Flask, чтобы позволить нам загружать наши файлы.
# app.py
from flask import Flask, request
app = Flask(__name__)
@app.post("/upload")
def upload_file():
file = request.files['file']
file.save(f"./temp/{file.filename}")
return {"status": True}
if __name__ == '__main__':
app.run()
Запустите это в новом окне терминала
$ python app.py
НОРМАЛЬНЫЙ СПОСОБ ОБРАБОТКИ (синхронно)
Если мы протестируем эту функцию с 1000 изображениями, запустив наш flask-сервер локально, это займет много времени, потому что он обрабатывает их синхронно.
Синхронное программирование или выполнение в основном означает, что следующий процесс должен дождаться завершения текущего процесса, прежде чем начать его.
Синхронное выполнение подходит только в том случае, если текущий процесс нуждается в данных, сгенерированных предыдущим процессом.
import time
def normal_way(image_paths):
for image_path in image_paths:
image = read_image(image_path)
image = draw_face(image)
fn = write_image(image, image_path)
upload_image(fn)
if __name__ == '__main__':
# Trying this 100 times to simulate 100 images
image_paths = ["a.jpg"] * 100
start = time.time()
normal_way(image_paths)
stop = time.time()
# Time taken for synchronous: 309.1052303314209
print(f"Time taken for synchronous: {stop-start}")
Если бы у нас было 1000 образцов, то затраченное время составило бы примерно 3091,05 секунды.
НАШ ОПТИМИЗИРОВАННЫЙ КОД (АСИНХРОННОЕ ВЫПОЛНЕНИЕ)
Асинхронное программирование, по сути, противоположно синхронному программированию, следующий процесс может запускаться одновременно или параллельно с текущим процессом.
Существует множество способов добиться этого: Цикл событий, многопоточность, многопроцессорность и т.д. В данном учебнике мы будем использовать только многопоточность и многопроцессорность.
Мы будем использовать многопоточность для функций, основанных на IO: чтение образов с диска, запись на диск и загрузка на сервер.
Мы будем использовать многопоточность для функции, требующей больших затрат процессора, — функции, которая обнаруживает и рисует прямоугольники на лицах.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def optimised_solution(image_paths: list):
# Define our pool executors
with ThreadPoolExecutor() as thread_pool:
with ProcessPoolExecutor() as process_pool:
# We use threading to read our images
images = thread_pool.map(read_image, image_paths)
# We use multiprocessing to process our images
processed_images = process_pool.map(draw_face,images)
# We use multithreadfing to write our images to disk
files = thread_pool.map(write_image, processed_images)
# And finally, we use multithreading to push our images to our server
responses = thread_pool.map(upload_image, files)
print("Processed: ", all([response == 200 for response in responses]))
if __name__ == '__main__':
# Trying this 100 times to simulate 100 images
image_paths = ["a.jpg"] * 100
start = time.time()
optimised_solution(image_paths)
stop = time.time()
# Time taken for Optimised 1: 147.17485308647156
print(f"Time taken for Optimised 1: {stop-start}")
ОСНОВНЫЕ ВЫВОДЫ
-
Обратите внимание, что наше решение, оптимизированное с помощью мультипроцессинга, заняло примерно в два раза меньше времени для завершения выполнения.
-
Наша программа теперь имеет возможность горизонтального масштабирования с увеличением количества процессоров благодаря концепции парралельных вычислений, рассмотренной в части 1.
-
Если бы мы использовали асинхронный сервер (ASGI), наша программа была бы более оптимизирована, но давайте оставим это на другой день.
ПОЛНЫЙ КОД
import cv2
import time
from requests import post
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
cascPath = "haarcascade_frontalface_default.xml"
# Create the haar cascade
faceCascade = cv2.CascadeClassifier(cascPath)
def draw_face(image):
# More Info: https://realpython.com/face-recognition-with-python/
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Detect faces in the image
faces = faceCascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30)
)
# Draw a rectangle around the faces
for (x, y, w, h) in faces:
cv2.rectangle(image, (x, y), (x+w, y+h), (0, 255, 0), 2)
return image
def read_image(image_path):
return cv2.imread(image_path)
def write_image(image, image_path="output.jpg"):
fn = f"out-{image_path}"
cv2.imwrite(fn, image)
return fn
def upload_image(fn):
url = "http://localhost:5000/upload"
files = {'file': open(fn, "rb")}
return post(url, files=files).status_code
def normal_way(image_paths):
for image_path in image_paths:
image = read_image(image_path)
image = draw_face(image)
fn = write_image(image, image_path)
upload_image(fn)
def optimised_solution(image_paths: list):
# Define our pool executors
with ThreadPoolExecutor() as thread_pool:
with ProcessPoolExecutor() as process_pool:
# We use threading to read our images
images = thread_pool.map(read_image, image_paths)
# We use multiprocessing to process our images
processed_images = process_pool.map(draw_face,images)
# We use multithreadfing to write our images to disk
files = thread_pool.map(write_image, processed_images)
# And finally, we use multithreading to push our images to our server
responses = thread_pool.map(upload_image, files)
print("Processed: ", all([response == 200 for response in responses]))
if __name__ == '__main__':
# Trying this 100 times to simulate 100 images
image_paths = ["a.jpg"] * 100
start = time.time()
normal_way(image_paths)
stop = time.time()
# Time taken for synchronous: 309.1052303314209
print(f"Time taken for synchronous: {stop-start}")
start = time.time()
optimised_solution(image_paths)
stop = time.time()
# Time taken for Optimised 1: 147.17485308647156
print(f"Time taken for Optimised 1: {stop-start}")
ЗАКЛЮЧЕНИЕ
-
Многопроцессорность и многопоточность могут быть объединены, чтобы значительно повысить производительность нашего кода.
-
Синхронное выполнение может масштабироваться только при наличии более быстрых процессоров, но многопоточность позволяет нам масштабироваться с большим количеством процессоров.
Не стесняйтесь задавать любые вопросы в разделе комментариев или обращайтесь ко мне по адресу ikabolo59@gmail.com и в Twitter.
Спасибо