Асинхронное программирование
Когда программе необходимо взаимодействовать с внешним миром, например с другим компьютером через Интернет, операции в программе могут выполняться в непредсказуемой очередности. Допустим, программе необходимо скачать файл. После запуска операции скачивания желательно, чтобы выполнялись другие операции, пока скачивание не завершится, после чего возобновится выполнение кода, которому требуется скачанный файл. Такой сценарий относится к области асинхронного программирования, которое также иногда называют параллельным (так как одновременно выполняется несколько операций).
Для таких ситуаций в Julia имеются объекты Task
— задачи (они могут называться и по-другому: симметричные сопрограммы, легковесные потоки, совместная многозадачность или одноразовые продолжения). Когда какая-либо вычислительная работа (на практике это обычно определенная функция) выполняется как Task
, ее становится возможным прервать путем переключения на другой объект Task
. Исходную задачу Task
можно будет позднее возобновить, причем ее выполнение продолжится в точности с того места, где было прервано. На первый взгляд может показаться, что это похоже на вызов функции. Однако есть два основных отличия. Во-первых, при переключении задач пространство не используется, поэтому переключать задачи можно сколь угодно часто, не расходуя стек вызовов. Во-вторых, переключение между задачами может происходить в любом порядке, в то время как при вызове функций управление возвращается вызывающей функции только после завершения выполнения вызванной.
Основные операции с объектами Task
Задачу (Task
) можно представить как дескриптор для некоторой единицы вычислительной работы, которую необходимо выполнить. Ее жизненный цикл состоит из этапов создания, запуска, выполнения и завершения. Задачи создаются путем вызова конструктора Task
для функции без аргументов, которую необходимо выполнить, или с помощью макроса @task
:
julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0
@task x
эквивалентно Task(()->x)
.
Эта задача ожидает пять секунд, а затем выводит done
. Однако она еще не начала выполняться. Ее можно запустить в любой момент, вызвав функцию schedule
:
julia> schedule(t);
Если вы попробуете сделать это в REPL, то увидите, что функция schedule
немедленно возвращает управление. Причина в том, что она просто добавляет t
во внутреннюю очередь задач, подлежащих выполнению. Затем в REPL выводится следующее приглашение к вводу данных. Пока ожидается ввод данных с клавиатуры, могут выполняться другие задачи, поэтому запускается задача t
. t
вызывает функцию sleep
, которая устанавливает таймер и прекращает выполнение. Если запланированы другие задачи, они могут выполняться в это время. Через пять секунд таймер срабатывает и снова запускает задачу t
, которая выводит done
. После этого задача t
завершается.
Функция wait
блокирует выполнение вызвавшей ее задачи до тех пор, пока не завершится какая-либо другая задача. Например, если ввести,
julia> schedule(t); wait(t)
а не просто вызвать schedule
, следующее приглашение к вводу появится через пять секунд. Причина в том, что REPL ожидает завершения задачи t
, прежде чем продолжить работу.
Часто бывает нужно создать задачу и сразу запланировать ее к выполнению. Для этого предусмотрен макрос @async
: вызов @async x
эквивалентен вызову schedule(@task x)
.
Взаимодействие с каналами
В ряде ситуаций единицы работы не связаны с вызовами функций: среди задач, подлежащих выполнению, нет явных вызывающих или вызываемых объектов. Примером может служить сценарий «источник-потребитель», когда одна сложная процедура генерирует значения, а другая — использует их. Потребитель не может просто вызвать функцию-источник для получения значения, так как источник, возможно, еще не создал все значения и не готов их предоставить. При использовании задач источник и потребитель могут выполняться столько, сколько нужно, обмениваясь данными по мере необходимости.
Для решения этой проблемы в Julia имеется механизм каналов (Channel
). Channel
— это очередь ожидания, работающая по принципу «первым поступил — первым обслужен»: в нее могут записывать данные или считывать данные из нее сразу несколько задач.
Давайте определим задачу-источник, которая создает значения посредством вызова метода put!
. Для получения значений необходимо запланировать выполнение потребителя в новой задаче. Для запуска задачи, привязанной к каналу, можно использовать специальный конструктор Channel
, который принимает в качестве аргумента функцию с одним аргументом. Затем можно многократно принимать значения из канала с помощью метода take!
:
julia> function producer(c::Channel)
put!(c, "start")
for n=1:4
put!(c, 2n)
end
put!(c, "stop")
end;
julia> chnl = Channel(producer);
julia> take!(chnl)
"start"
julia> take!(chnl)
2
julia> take!(chnl)
4
julia> take!(chnl)
6
julia> take!(chnl)
8
julia> take!(chnl)
"stop"
Источник (producer
) словно бы способен многократно возвращать управление. Между вызовами put!
выполнение источника приостанавливается, и управление получает потребитель.
Возвращаемый канал Channel
можно использовать как итерируемый объект в цикле for
. В этом случае переменная цикла принимает все полученные значения. Цикл завершается после закрытия канала.
julia> for x in Channel(producer)
println(x)
end
start
2
4
6
8
stop
Обратите внимание: нам не пришлось явным образом закрывать канал в источнике. Причина в том, что при привязке объекта Channel
к объекту Task
время открытия канала будет зависеть от привязанной задачи. Канал автоматически закрывается при завершении задачи. К одной задаче можно привязать несколько каналов, и наоборот.
Конструктор Task
принимает функцию без аргументов, в то время как метод Channel
, создающий привязанный к задаче канал, принимает функцию с одним аргументом типа Channel
. Стандартным подходом является параметризация источника. В этом случае требуется применение частичной функции для создания анонимной функции без аргументов или с одним аргументом.
Для объектов Task
это можно делать напрямую или с помощью вспомогательного макроса:
function mytask(myarg)
...
end
taskHdl = Task(() -> mytask(7))
# или, что то же самое:
taskHdl = @task mytask(7)
Для организации более сложных шаблонов распределения работы в сочетании с конструкторами Task
и Channel
можно использовать функции bind
и schedule
, явным образом связывая набор каналов с набором задач-источников и задач-потребителей.
Дополнительные сведения о каналах
Канал можно наглядно представить как конвейер, с одного конца производится запись, а с другого — чтение:
-
Разные задачи могут одновременно выполнять запись в один канал посредством вызовов
put!
. -
Разные задачи могут одновременно считывать данные посредством вызовов
take!
. -
Пример:
# При наличии каналов c1 и c2 c1 = Channel(32) c2 = Channel(32) # и функции `foo`, которая считывает элементы из c1, обрабатывает полученный элемент # и записывает результат в c2, function foo() while true data = take!(c1) [...] # обработка данных put!(c2, result) # запись результата end end # можно запланировать к одновременному выполнению `n` экземпляров `foo`. for _ in 1:n errormonitor(@async foo()) end
-
Каналы создаются с помощью конструктора
Channel{T}(sz)
. Канал будет содержать только объекты типаT
. Если тип не указан, канал может содержать объекты любого типа.sz
— это максимальное количество элементов, которое может содержать канал в любой момент времени. Например,Channel(32)
создает канал, который может вмещать не более 32 объектов любого типа.Channel{MyType}(64)
может вмещать в любой момент времени до 64 объектов типаMyType
. -
Если объект
Channel
пуст, операции чтения (при вызовеtake!
) блокируются, пока данные не станут доступны. -
Если объект
Channel
заполнен, операции записи (при вызовеput!
) блокируются, пока не станет доступно свободное место. -
Метод
isready
проверяет наличие хотя бы одного объекта в канале, аwait
ожидает, когда станет доступен объект. -
Изначально объект
Channel
находится в открытом состоянии. Это означает, что данные можно свободно считывать из него и записывать в него с помощью вызововtake!
иput!
. Функцияclose
закрывает объектChannel
. Если объектChannel
закрыт, вызовput!
завершится ошибкой. Пример:julia> c = Channel(2); julia> put!(c, 1) # Вызов `put!` для открытого канала завершается успешно. 1 julia> close(c); julia> put!(c, 2) # Вызов `put!` для закрытого канала вызывает исключение. ERROR: InvalidStateException: Channel is closed. Stacktrace: [...]
-
Метод
take!
и методfetch
(который получает значение, но не удаляет его) для закрытого канала успешно возвращают любые имеющиеся значения, пока канал не станет пустым. Продолжим приведенный выше пример.julia> fetch(c) # Любое количество вызовов `fetch` завершается успешно. 1 julia> fetch(c) 1 julia> take!(c) # При первом вызове `take!` значение удаляется. 1 julia> take!(c) # В закрытом канале больше нет данных. ERROR: InvalidStateException: Channel is closed. Stacktrace: [...]
Рассмотрим простой пример использования каналов для взаимодействия между задачами. Запустим четыре задачи для обработки данных из одного канала jobs
. В канал записываются задания, идентифицируемые по job_id
. Каждая задача в этом примере считывает job_id
, ожидает случайное количество времени и записывает в канал результатов кортеж, состоящий из job_id
и времени ожидания. Наконец, выводятся все данные из канала results
.
julia> const jobs = Channel{Int}(32);
julia> const results = Channel{Tuple}(32);
julia> function do_work()
for job_id in jobs
exec_time = rand()
sleep(exec_time) # Имитирует время, затрачиваемое на реальную работу
# обычно выполняемую где-то еще
put!(results, (job_id, exec_time))
end
end;
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> errormonitor(@async make_jobs(n)); # Подаем в канал jobs n заданий
julia> for i in 1:4 # запускает 4 задачи для параллельной обработки запросов
errormonitor(@async do_work())
end
julia> @elapsed while n > 0 # Выводим результаты
job_id, exec_time = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds")
global n = n - 1
end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311
Вместо errormonitor(t)
более надежным решением может быть использование функции bind(results, t)
, так при этом не только будут регистрироваться все непредвиденные сбои, но и будут принудительно закрываться все связанные ресурсы, а исключения будут передаваться далее.
Дополнительные операции с задачами
Операции с задачами основаны на низкоуровневом примитиве yieldto
. Вызов yieldto(task, value)
приостанавливает текущую задачу, выполняет переключение на задачу, указанную в аргументе task
, и предписывает последнему вызову yieldto
этой задачи вернуть указанное в value
значение. Обратите внимание: yieldto
— это единственная операция, которая должна использовать порядок выполнения в стиле задач; вместо вызова и возврата управления мы просто переключаемся на другую задачу. Вот почему задачи также называют «симметричными сопрограммами»: переключение на каждую задачу и из нее происходит с использованием одного и того же механизма.
yieldto
— это очень эффективная функция, но при использовании задач она редко вызывается напрямую. Разберем возможные причины. Если вы переключаетесь из текущей задачи, то, вероятно, через какое-то время может потребоваться снова переключиться на нее. Но для определения нужного момента переключения, а также задачи, которая отвечает за такое переключение, требуется строгое согласование действий. Например, методы put!
и take!
блокируют операции, состояние которых в контексте использования каналов сохраняется, чтобы можно было определить потребители. Именно отсутствие необходимости отслеживать задачу-потребитель вручную делает метод put!
проще в использовании по сравнению с низкоуровневой функцией yieldto
.
Помимо yieldto
, для эффективного использования задач требуется еще несколько базовых функций.
-
current_task
возвращает ссылку на выполняющуюся в данный момент задачу. -
istaskdone
запрашивает, завершила ли задача работу. -
istaskstarted
запрашивает, была ли задача запущена. -
task_local_storage
оперирует хранилищем пар «ключ-значение», связанным с текущей задачей.
Задачи и события
Чаще всего переключения задач происходят в результате ожидания событий, например запросов ввода-вывода, и производятся планировщиком, входящим в состав модуля Base в Julia. Планировщик ведет очередь задач, которые могут быть запущены, и выполняет цикл событий, который перезапускает задачи при наступлении внешних событий, например поступлении сообщения.
Базовая функция для ожидания события — wait
. Функцию wait
реализует ряд объектов. Например для объекта Process
функция wait
ожидает завершения его выполнения. Функция wait
часто неявная; например, wait
может выполняться внутри вызова read
для ожидания доступности данных.
Во всех этих случаях функция wait
в конечном итоге работает с объектом Condition
, который отвечает за постановку задач в очередь и их перезапуск. Когда задача вызывает функцию wait
при наступлении условия Condition
, она помечается как не подлежащая запуску, добавляется в очередь условия и переключается на планировщик. После этого планировщик выбирает другую задачу для запуска или блокирует ожидание внешних событий. Если все в порядке, обработчик события в конечном итоге вызывает функцию notify
для условия, так что ожидающие его выполнения задачи снова становятся доступны для запуска.