Короткая история…
Я помогал коллеге с задачей, связанной с асинхронными задачами на golang. «Как лучше всего сделать вещь «х»?». В тот момент передо мной открылось огромное количество возможностей для решения этой проблемы. Как же объяснить это удовлетворительно, не превратив следующие полчаса в скучный монолог?
Идея компиляции решений
Некоторое время назад я прочитал это решение, которое автор также не приписывал Вам, новинка в программировании (только не!). Статья была следующая MULTIPLEXING CHANNELS IN GO, я советую вам прочитать ее, если вы хотите углубиться в некоторые другие темы, как каналы.
Далее я представлю идею мультиплексирования в самом прямом виде.
Концепция:
Речь идет о возможности объединения множества каналов в один канал. Таким образом, мы можем свести данные к одной точке, которая станет возобновлением синхронности нашего кода.
Мультиплексор (сокращение: MUX), иногда называемый англицизмом multiplexer или multiplex, — это устройство, которое выбирает информацию из двух или более источников данных в одном канале. Они используются в ситуациях, когда стоимость реализации отдельных каналов для каждого источника данных выше, чем стоимость и неудобство использования функций мультиплексирования/демультиплексирования. — Википедия. Обратите внимание на изображения.
Перейдем к коду
Мы выполним следующие действия:
- Мы создадим функцию распространения/расхождения, отвечающую за распределение Заданий между несколькими Работниками.
- Создадим функцию Worker, которая будет отвечать за выполнение задания и передавать результаты в канал мультиплексирования (Result)
- Мы создадим функцию агрегации, отвечающую за получение результатов и возврат к синхронному потоку выполнения.
Мы рассмотрим для этого сценария следующие предположения:
// Tipo gerado para cada task poderíamos ter uma struct aqui
type Task int
// Tipo gerado para cada result poderíamos ter uma struct aqui
type Result int
// Função que representa um tempo de execução qualquer
// para finalização da tarefa do worker
func asyncSimulation(t Task) int {
time.Sleep(1 * time.Second)
return (int(t) * int(t))
}
0 — Наша основная функция:
func main() {
// Trabalharemos com 10 tasks para serem completadas
tasks := taskGenerator(1, 10)
results := make(chan Result)
// Esse wait group servirá para controlar o fechamento
// do canal de result e evitar deadlocks
wg := &sync.WaitGroup{}
// 5 workers serão utilizados nesse exemplo
for i := 0; i < 5; i++ {
wg.Add(1)
worker(tasks, results, wg)
}
// Verificamos o momento que todos os workers
// terminarem de trabalhar
verifyEnd(results, wg)
// Recebemos todos os resultados no canal result
// e imprimimos á medida que são resolvidos
resultAggregator(results)
}
1 — Функция распространения:
Это функция, отвечающая за подачу в канал задач заданий, которые будут выполняться.
Поскольку мы не определили размер буфера канала, необходимо, чтобы распределение задач выполнялось в горутине, чтобы не возникло тупика.
func taskGenerator(start int, end int) <-chan Task {
tasks := make(chan Task)
go func() {
for i := start; i < end; i++ {
tasks <- Task(i)
}
close(tasks)
}()
return tasks
}
2 — Работа с более чем одним работником
Для работы с более чем одним работником нам также необходимо управлять закрытием канала результатов, для этого мы определяем группу ожидания для работников
func worker(tasks <-chan Task, result chan<- Result, wg *sync.WaitGroup) {
go func() {
for task := range tasks {
result <- Result(asyncSimulation(task))
}
wg.Done()
}()
}
func verifyEnd(results chan<- Result, wg *sync.WaitGroup) {
go func() {
wg.Wait()
close(results)
}()
}
3 — Функция агрегации:
Мы используем эту функцию для получения всех данных, переданных в канал результатов, а затем суммируем их и выводим на экран
func resultAggregator(res <-chan Result) {
sum := 0
totalResults := 0
for res := range res {
fmt.Printf("received result %vn", res)
sum += int(res)
totalResults += 1
}
fmt.Printf("total os squares received: %dn", totalResults)
fmt.Printf("sum of squares: %d", sum)
}
Анализ результата
// Após 1 segundo recebemos os primeiros 5 resultados
// tempo gasto pelos 5 workers para processar as primeiras
// 5 tasks
received result 16
received result 9
received result 4
received result 0
received result 1
// Após mais 1 segundo recebemos os próximos 5 resultados
// tempo gasto pelos 5 workers para processar as últimas
// 5 tasks
received result 36
received result 25
received result 49
received result 64
received result 81
// impressão das demais informações do agregador
total os squares received: 10
sum of squares: 285
Расширение применимости:
Эта концепция мультиплексирования — лишь одна из нескольких, которые мы можем использовать для обеспечения асинхронной работы при решении различных задач. Его применение варьируется от запросов к внешним API до выполнения дорогостоящих алгоритмов в небольших потоках.
Источники
-
МУЛЬТИПЛЕКСИРОВАНИЕ КАНАЛОВ В GO — https://katcipis.github.io/blog/mux-channels-go/
-
Мои го-решения на 2017 год — https://research.swtch.com/go2017
-
Мультиплексор — https://pt.wikipedia.org/wiki/Multiplexador