Документация Engee

Асинхронное программирование и многопоточность

Асинхронное программирование

Как правило, задачи выполняются последовательно или синхронно в некотором потоке. Если поток запущен на одноядерном компьютере, а задача является блокирующей, то это означает, что программа должна завершить выполнение потока, прежде чем переходить к другим задачам. Асинхронные задачи, также известные как сопрограммы, нацелены на максимальную эффективность в рамках одного потока разделяя задачи на несколько потоков и позволяя ядру быстро переключаться между ними. Это означает, что задачи могут запускаться и останавливаться без излишнего использования системных ресурсов. Другими словами, сопрограммы обеспечивают неблокирующее выполнение. Сопрограммы оказываются полезными, когда задачи включают обработку событий, процессы "производитель-потребитель" или ожидание операций ввода-вывода, таких как сетевые запросы или файловые операции.

Например, выполнение некоторого преобразования данных занимает много времени. Этот процесс описывается следующей функцией:

function long_process()
    sleep(3) # имитация длительного процесса
    return 42.0 # возврат некоторого результата
end
long_process (generic function with 1 method)

Такой процесс займёт слишком много времени, если потребуется обработать большое количество данных. Например, выполнение следующего блока кода занимает чуть более 9 секунд, поскольку последовательно вызываются 3 функции , каждая из которых выполняется приблизительно 3 секунды:

@elapsed begin
    p1 = long_process() # имитация длительного преобразования данных
    p2 = long_process() # имитация длительного преобразования данных
    p3 = long_process() # имитация длительного преобразования данных
end
9.005899319

Приведенный выше блок кода представляет собой типичный сценарий, где действия выполняются последовательно и приводят к суммированию времени выполнения. К счастью, общее время выполнения можно сократить до величины самого продолжительного процесса. Этого можно достигнуть с помощью сопрограммы, Task:

@elapsed begin
t1 = Task(long_process); schedule(t1)
t2 = Task(long_process); schedule(t2)
t3 = Task(long_process); schedule(t3)
(fetch(t1), fetch(t2), fetch(t3))
end
3.012623215

Тот же результат был достигнут всего за 3 секунды. Каждый long_process был запущен как отдельная рабочая единица, или задача. Это разделение позволяет процессору переключаться между задачами во время выполнения и добиваться прогресса в выполнении более чем одной задачи одновременно.

image_6.png

Для достижения тех же результатов можно использовать макрос @task:

@elapsed begin
t1 = @task long_process(); schedule(t1)
t2 = @task long_process(); schedule(t2)
t3 = @task long_process(); schedule(t3)
(fetch(t1), fetch(t2), fetch(t3))
end
3.017859429

Многопоточность

В предыдущих примерах каждая задача выполнялась параллельно с другими в одном потоке. В строгом смысле, такой параллелизм - это не параллелизм. Асинхронные задачи могут быть очень полезны, но иногда требуется настоящий параллелизм. Julia позволяет планировать задачи во многих потоках сразу. Макрос Threads.@spawn можно использовать для перезаписи функции long_process. Для каждого процесса создайте задачу и запланируйте ее выполнение в любом потоке, как только он станет доступным:

@elapsed begin
s1 = Threads.@spawn long_process()
s2 = Threads.@spawn long_process()
s3 = Threads.@spawn long_process()
(fetch(s1), fetch(s2), fetch(s3))
end
3.027746895

Чтобы убедиться в том, что Threads.@spawn создает и планирует задачи, можно проверить возвращаемый тип:

Threads.@spawn sleep(1)
Task (runnable) @0x00007f96aad98330

Выполнение задач в нескольких доступных потоках может быть упрощено с помощью макроса Threads.@threads. Чтобы выполнить многопоточный цикл for, просто добавьте к циклу префикс @threads. Выполнение результирующего блока кода занимает ~3 секунды, потому что каждый long_process запускается как новая задача и распределяется по доступному потоку (как в примере, где используется @spawn).

Threads.@threads for _ in 1:3
    result = long_process()
end

Макрос @threads упрощает планирование задач в доступных потоках, но результат выполнения задачи не получить без специальной конструкции для вывода данных - Channel (канал). При создании канала можно указать его тип и размер (объявляется количество входов). Например, Channel{String}(12) создает канал, который может содержать 12 строковых элементов одновременно. Если тип опущен, канал разрешит ввод данных типа Any. Создадим канал без указания типа данных:

ch = Channel(32)
Channel{Any}(32) (empty)

Был создан пустой канал (empty). Теперь запишем в него данные с помощью функции put!:

Threads.@threads for _ in 1:3
    result = long_process()
    put!(ch, result) # запись результата выполнения задачи с помощью put! в канал ch
end
ch
Channel{Any}(32) (3 items available)

В канал были записаписаны 3 результата выполнения функции long_process, для того чтобы отобразить первый из них необходимо воспользоваться функцией fetch:

fetch(ch)
42.0

Чтобы извлечь следующий элемент канала воспользуемся функцией take!:

take!(ch)
42.0

Данная функция как извлекает, так и удаляет элемент из канала.

Вывод

В данном примере были продемонстрированы основы работы с асинхронными задачами и многопоточностью, а также вывод данных с помощью каналов.