Как реализовать рабочие пулы в Rust

Первоначально опубликовано на моем сайте: https://kerkour.com/rust-worker-pool

Подумайте дважды. Рабочие пулы не очень подходят для Rust из-за его модели владения. Вместо этого возьмите на вооружение функциональное программирование и неизменяемые данные. Rust предоставляет более простые в использовании и элегантные инструменты: параллельные итераторы и потоки.

Важно отметить, что, как и при работе с рабочими пулами в любом языке программирования, всегда следует устанавливать верхний предел параллелизма. В противном случае вы можете быстро исчерпать ресурсы вашей системы.

Для:

  • заданий с интенсивными вычислениями
  • Задания с интенсивным вводом-выводом

Задания с интенсивными вычислениями

Для вычислительно-интенсивных заданий (с привязкой к процессору) существует крейт rayon, который предоставляет параллельные итераторы: Итераторы, комбинаторы которых отправляются в пул потоков. Приятно то, что пул потоков скрыт от нас, разработчиков. Мы просто должны писать код, как если бы мы использовали стандартные итераторы.

Cargo.toml

[package]
name = "rust_worker_pool"
version = "0.1.0"
authors = ["Sylvain Kerkour <sylvain@kerkour.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rand = "0.8"

rayon = "1"
Вход в полноэкранный режим Выход из полноэкранного режима

main.rs

use rand::{thread_rng, Rng};
use rayon::prelude::*;
use std::time::Duration;

fn compute_job(job: i64) -> i64 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..10);
    std::thread::sleep(Duration::from_millis(sleep_ms));

    job * job
}

fn process_result(result: i64) {
    println!("{}", result);
}

fn main() {
    let jobs = 0..100;

    jobs.into_par_iter()
        .map(compute_job)
        .for_each(process_result);
}
Войти в полноэкранный режим Выйти из полноэкранного режима

По умолчанию пул потоков имеет размер, равный количеству логических CPU машины.

Задания с интенсивным вводом/выводом (async)

Для заданий, связанных с вводом/выводом, нам нужно перейти на землю async. Точнее, мы будем использовать Streams, которые представляют собой async итераторы, способные обрабатывать элементы одновременно.

Но сам признак Stream не предоставляет комбинаторов. Нам нужно импортировать признак StreamExt из ящика futures.

Cargo.toml

[package]
name = "rust_worker_pool"
version = "0.1.0"
authors = ["Sylvain Kerkour <sylvain@kerkour.com>"]
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rand = "0.8"

tokio = { version = "1", features = ["full"] }
futures = "0.3"
Вход в полноэкранный режим Выход из полноэкранного режима

for_each_concurrent

for_each_concurrent является самым простым в использовании, так как потребляет поток. Это означает, что он возвращает не сам поток, а Future, который может быть .awaited.

main.rs

use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;

async fn compute_job(job: i64) -> i64 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..10);
    tokio::time::sleep(Duration::from_millis(sleep_ms)).await;

    job * job
}

async fn process_result(result: i64) {
    println!("{}", result);
}

#[tokio::main]
async fn main() {
    let jobs = 0..100;
    let concurrency = 42;

    stream::iter(jobs)
        .for_each_concurrent(concurrency, |job| async move {
            let result = compute_job(job).await;
            process_result(result).await;
        })
        .await;
}
Вход в полноэкранный режим Выход из полноэкранного режима

buffer_unordered

С другой стороны, buffer_unordered не потребляет поток. Поэтому нам нужно использовать for_each в качестве поглотителя для потребления потока.

main.rs

use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;

async fn compute_job(job: i64) -> i64 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..10);
    tokio::time::sleep(Duration::from_millis(sleep_ms)).await;

    job * job
}

async fn process_result(result: i64) {
    println!("{}", result);
}

#[tokio::main]
async fn main() {
    let jobs = 0..100;
    let concurrency = 42;

    stream::iter(jobs)
        .map(compute_job)
        .buffer_unordered(concurrency)
        .for_each(process_result)
        .await;
}
Вход в полноэкранный режим Выход из полноэкранного режима

Сбор результатов

Иногда, вместо непосредственной обработки результатов, нам может понадобиться их собрать, например, для последующей пакетной отправки. Хорошая новость, метод collect доступен в Streams:

main.rs

use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;

async fn compute_job(job: i64) -> i64 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..10);
    tokio::time::sleep(Duration::from_millis(sleep_ms)).await;

    job * job
}

async fn process_result(result: i64) {
    println!("{}", result);
}

#[tokio::main]
async fn main() {
    let jobs = 0..100;
    let concurrency = 42;

    let results: Vec<i64> = stream::iter(jobs)
        .map(compute_job)
        .buffer_unordered(concurrency)
        .collect()
        .await;
}
Вход в полноэкранный режим Выйти из полноэкранного режима

Код находится на GitHub

Как обычно, вы можете найти код на GitHub: github.com/skerkour/kerkour.com (пожалуйста, не забудьте отметить репо 🙏).

Оцените статью
devanswers.ru
Добавить комментарий