Golang — Работа с каналами, шаг за пределы основ.


Короткая история…

Я помогал коллеге с задачей, связанной с асинхронными задачами на golang. «Как лучше всего сделать вещь «х»?». В тот момент передо мной открылось огромное количество возможностей для решения этой проблемы. Как же объяснить это удовлетворительно, не превратив следующие полчаса в скучный монолог?

Идея компиляции решений

Некоторое время назад я прочитал это решение, которое автор также не приписывал Вам, новинка в программировании (только не!). Статья была следующая MULTIPLEXING CHANNELS IN GO, я советую вам прочитать ее, если вы хотите углубиться в некоторые другие темы, как каналы.

Далее я представлю идею мультиплексирования в самом прямом виде.

Концепция:

Речь идет о возможности объединения множества каналов в один канал. Таким образом, мы можем свести данные к одной точке, которая станет возобновлением синхронности нашего кода.

Мультиплексор (сокращение: MUX), иногда называемый англицизмом multiplexer или multiplex, — это устройство, которое выбирает информацию из двух или более источников данных в одном канале. Они используются в ситуациях, когда стоимость реализации отдельных каналов для каждого источника данных выше, чем стоимость и неудобство использования функций мультиплексирования/демультиплексирования. — Википедия. Обратите внимание на изображения.



Перейдем к коду

Мы выполним следующие действия:

  1. Мы создадим функцию распространения/расхождения, отвечающую за распределение Заданий между несколькими Работниками.
  2. Создадим функцию Worker, которая будет отвечать за выполнение задания и передавать результаты в канал мультиплексирования (Result)
  3. Мы создадим функцию агрегации, отвечающую за получение результатов и возврат к синхронному потоку выполнения.

Мы рассмотрим для этого сценария следующие предположения:

// Tipo gerado para cada task poderíamos ter uma struct aqui
type Task int

// Tipo gerado para cada result poderíamos ter uma struct aqui
type Result int

// Função que representa um tempo de execução qualquer 
// para finalização da tarefa do worker
func asyncSimulation(t Task) int {
    time.Sleep(1 * time.Second)
    return (int(t) * int(t))
}
Войдите в полноэкранный режим Выход из полноэкранного режима

0 — Наша основная функция:

func main() {
        // Trabalharemos com 10 tasks para serem completadas
    tasks := taskGenerator(1, 10)
    results := make(chan Result)
        // Esse wait group servirá para controlar o fechamento
        // do canal de result e evitar deadlocks 
    wg := &sync.WaitGroup{}

        // 5 workers serão utilizados nesse exemplo
    for i := 0; i < 5; i++ {
        wg.Add(1)
        worker(tasks, results, wg)
    }

        // Verificamos o momento que todos os workers 
        // terminarem de trabalhar
    verifyEnd(results, wg)

        // Recebemos todos os resultados no canal result
        // e imprimimos á medida que são resolvidos
    resultAggregator(results)
}
Войдите в полноэкранный режим Выход из полноэкранного режима

1 — Функция распространения:

Это функция, отвечающая за подачу в канал задач заданий, которые будут выполняться.
Поскольку мы не определили размер буфера канала, необходимо, чтобы распределение задач выполнялось в горутине, чтобы не возникло тупика.

func taskGenerator(start int, end int) <-chan Task {
    tasks := make(chan Task)

    go func() {
        for i := start; i < end; i++ {
            tasks <- Task(i)
        }

        close(tasks)
    }()

    return tasks
}
Войдите в полноэкранный режим Выход из полноэкранного режима

2 — Работа с более чем одним работником

Для работы с более чем одним работником нам также необходимо управлять закрытием канала результатов, для этого мы определяем группу ожидания для работников

func worker(tasks <-chan Task, result chan<- Result, wg *sync.WaitGroup) {

    go func() {
        for task := range tasks {
            result <- Result(asyncSimulation(task))
        }
        wg.Done()
    }()
}

func verifyEnd(results chan<- Result, wg *sync.WaitGroup) {
    go func() {
        wg.Wait()
        close(results)
    }()
}
Войдите в полноэкранный режим Выход из полноэкранного режима

3 — Функция агрегации:

Мы используем эту функцию для получения всех данных, переданных в канал результатов, а затем суммируем их и выводим на экран

func resultAggregator(res <-chan Result) {
    sum := 0
    totalResults := 0

    for res := range res {
        fmt.Printf("received result %vn", res)
        sum += int(res)
        totalResults += 1
    }

    fmt.Printf("total os squares received: %dn", totalResults)
    fmt.Printf("sum of squares: %d", sum)
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Анализ результата

// Após 1 segundo recebemos os primeiros 5 resultados
// tempo gasto pelos 5 workers para processar as primeiras
// 5 tasks
received result 16
received result 9
received result 4
received result 0
received result 1
// Após mais 1 segundo recebemos os próximos 5 resultados
// tempo gasto pelos 5 workers para processar as últimas
// 5 tasks
received result 36
received result 25
received result 49
received result 64
received result 81
// impressão das demais informações do agregador
total os squares received: 10
sum of squares: 285
Войдите в полноэкранный режим Выход из полноэкранного режима

Расширение применимости:

Эта концепция мультиплексирования — лишь одна из нескольких, которые мы можем использовать для обеспечения асинхронной работы при решении различных задач. Его применение варьируется от запросов к внешним API до выполнения дорогостоящих алгоритмов в небольших потоках.


Источники

  • МУЛЬТИПЛЕКСИРОВАНИЕ КАНАЛОВ В GO — https://katcipis.github.io/blog/mux-channels-go/

  • Мои го-решения на 2017 год — https://research.swtch.com/go2017

  • Мультиплексор — https://pt.wikipedia.org/wiki/Multiplexador

Оцените статью
devanswers.ru
Добавить комментарий