Асинхронное программирование и многопоточность¶
Асинхронное программирование¶
Как правило, задачи выполняются последовательно или синхронно в некотором потоке. Если поток запущен на одноядерном компьютере, а задача является блокирующей, то это означает, что программа должна завершить выполнение потока, прежде чем переходить к другим задачам. Асинхронные задачи, также известные как сопрограммы, нацелены на максимальную эффективность в рамках одного потока разделяя задачи на несколько потоков и позволяя ядру быстро переключаться между ними. Это означает, что задачи могут запускаться и останавливаться без излишнего использования системных ресурсов. Другими словами, сопрограммы обеспечивают неблокирующее выполнение. Сопрограммы оказываются полезными, когда задачи включают обработку событий, процессы "производитель-потребитель" или ожидание операций ввода-вывода, таких как сетевые запросы или файловые операции.
Например, выполнение некоторого преобразования данных занимает много времени. Этот процесс описывается следующей функцией:
function long_process()
sleep(3) # имитация длительного процесса
return 42.0 # возврат некоторого результата
end
Такой процесс займёт слишком много времени, если потребуется обработать большое количество данных. Например, выполнение следующего блока кода занимает чуть более 9 секунд, поскольку последовательно вызываются 3 функции , каждая из которых выполняется приблизительно 3 секунды:
@elapsed begin
p1 = long_process() # имитация длительного преобразования данных
p2 = long_process() # имитация длительного преобразования данных
p3 = long_process() # имитация длительного преобразования данных
end
Приведенный выше блок кода представляет собой типичный сценарий, где действия выполняются последовательно и приводят к суммированию времени выполнения. К счастью, общее время выполнения можно сократить до величины самого продолжительного процесса. Этого можно достигнуть с помощью сопрограммы, Task:
@elapsed begin
t1 = Task(long_process); schedule(t1)
t2 = Task(long_process); schedule(t2)
t3 = Task(long_process); schedule(t3)
(fetch(t1), fetch(t2), fetch(t3))
end
Тот же результат был достигнут всего за 3 секунды. Каждый long_process был запущен как отдельная рабочая единица, или задача. Это разделение позволяет процессору переключаться между задачами во время выполнения и добиваться прогресса в выполнении более чем одной задачи одновременно.
Для достижения тех же результатов можно использовать макрос @task:
@elapsed begin
t1 = @task long_process(); schedule(t1)
t2 = @task long_process(); schedule(t2)
t3 = @task long_process(); schedule(t3)
(fetch(t1), fetch(t2), fetch(t3))
end
Многопоточность¶
В предыдущих примерах каждая задача выполнялась параллельно с другими в одном потоке. В строгом смысле, такой параллелизм - это не параллелизм. Асинхронные задачи могут быть очень полезны, но иногда требуется настоящий параллелизм. Julia позволяет планировать задачи во многих потоках сразу. Макрос Threads.@spawn можно использовать для перезаписи функции long_process. Для каждого процесса создайте задачу и запланируйте ее выполнение в любом потоке, как только он станет доступным:
@elapsed begin
s1 = Threads.@spawn long_process()
s2 = Threads.@spawn long_process()
s3 = Threads.@spawn long_process()
(fetch(s1), fetch(s2), fetch(s3))
end
Чтобы убедиться в том, что Threads.@spawn создает и планирует задачи, можно проверить возвращаемый тип:
Threads.@spawn sleep(1)
Выполнение задач в нескольких доступных потоках может быть упрощено с помощью макроса Threads.@threads. Чтобы выполнить многопоточный цикл for, просто добавьте к циклу префикс @threads. Выполнение результирующего блока кода занимает ~3 секунды, потому что каждый long_process запускается как новая задача и распределяется по доступному потоку (как в примере, где используется @spawn).
Threads.@threads for _ in 1:3
result = long_process()
end
Макрос @threads упрощает планирование задач в доступных потоках, но результат выполнения задачи не получить без специальной конструкции для вывода данных - Channel (канал). При создании канала можно указать его тип и размер (объявляется количество входов). Например, Channel{String}(12) создает канал, который может содержать 12 строковых элементов одновременно. Если тип опущен, канал разрешит ввод данных типа Any. Создадим канал без указания типа данных:
ch = Channel(32)
Был создан пустой канал (empty). Теперь запишем в него данные с помощью функции put!:
Threads.@threads for _ in 1:3
result = long_process()
put!(ch, result) # запись результата выполнения задачи с помощью put! в канал ch
end
ch
В канал были записаписаны 3 результата выполнения функции long_process, для того чтобы отобразить первый из них необходимо воспользоваться функцией fetch:
fetch(ch)
Чтобы извлечь следующий элемент канала воспользуемся функцией take!:
take!(ch)
Данная функция как извлекает, так и удаляет элемент из канала.
Вывод:¶
В данном примере были продемонстрированы основы работы с асинхронными задачами и многопоточностью, а также вывод данных с помощью каналов.