Asynchronous programming
When a program needs to interact with the outside world, for example with another computer via the Internet, operations in the program can be performed in an unpredictable sequence. Let’s say the program needs to download a file. After starting the download operation, it is desirable that other operations are performed until the download is completed, after which the execution of the code that needs the downloaded file resumes. This scenario belongs to the field of asynchronous programming, which is also sometimes called parallel (since several operations are performed simultaneously).
Julia has facilities for such situations. Task
— tasks (they can be called in other ways: symmetric coroutines, lightweight threads, collaborative multitasking, or one-time continuations). When any computational work (in practice, it is usually a specific function) is performed as Task
, it becomes possible to interrupt it by switching to another object Task
. The original task Task
can be resumed later, and its execution will continue exactly from the place where it was interrupted. At first glance, it might seem like a function call. However, there are two main differences. Firstly, no space is used when switching tasks, so you can switch tasks as often as you want without using up the call stack. Secondly, switching between tasks can occur in any order, while when functions are called, control is returned to the calling function only after the execution of the called one is completed.
Basic operations with Task
objects
A task can be represented as a descriptor for some unit of computational work that needs to be performed. Its lifecycle consists of the stages of creation, launch, execution, and completion. Tasks are created by calling the Task
constructor for a function without arguments to be executed, or using a macro @task
:
julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0
'@task x` is equivalent to Task(()->x)
.
This task waits five seconds and then outputs done'. However, it has not started executing yet. It can be started at any time by calling the function `schedule
:
julia> schedule(t);
If you try to do this in the REPL, you will see that the schedule
function immediately returns control. The reason is that it just adds a t
to the internal queue of tasks to be completed. The following data entry prompt is then displayed in the REPL. While keyboard input is pending, other tasks may be running, so task t
is running. 't` calls the function sleep
, which sets a timer and stops execution. If other tasks are scheduled, they can be performed at this time. After five seconds, the timer fires and starts task t
again, which outputs done'. After that, task `t
is completed.
Function 'wait` blocks the execution of the task that caused it until some other task is completed. For example, if you enter,
julia> schedule(t); wait(t)
instead of just calling schedule
, the next input prompt will appear in five seconds. The reason is that the REPL waits for task t
to finish before continuing to work.
It is often necessary to create a task and schedule it for execution immediately. A macro is provided for this. '@async`: Calling @async x
is equivalent to calling `schedule(@task x)'.
Interaction with channels
In some situations, the units of work are not related to function calls: there are no explicit callers or callable objects among the tasks to be performed. An example is the source-consumer scenario, where one complex procedure generates values and another uses them. The consumer cannot simply call the source function to get the value, because the source may not have created all the values yet and is not ready to provide them. When using tasks, the source and consumer can run as long as needed, exchanging data as needed.
Julia has a channel mechanism to solve this problem (Channel
). Channel
is a waiting queue that operates on the principle of "first in, first out": multiple tasks can write data to it or read data from it at once.
Let’s define a source task that creates values by calling a method. put!
. To get the values, it is necessary to schedule the execution of the consumer in a new task. You can use a special constructor to run a task linked to a channel. Channel
, which takes a single-argument function as an argument. Then you can repeatedly take values from the channel using the method take!
:
julia> function producer(c::Channel)
put!(c, "start")
for n=1:4
put!(c, 2n)
end
put!(c, "stop")
end;
julia> chnl = Channel(producer);
julia> take!(chnl)
"start"
julia> take!(chnl)
2
julia> take!(chnl)
4
julia> take!(chnl)
6
julia> take!(chnl)
8
julia> take!(chnl)
"stop"
The source (producer') seems to be able to return control repeatedly. Between calls `put!
the execution of the source is suspended, and the consumer receives control.
Return channel Channel
can be used as an iterable object in the 'for` loop. In this case, the loop variable accepts all received values. The cycle ends after the channel is closed.
julia> for x in Channel(producer)
println(x)
end
start
2
4
6
8
stop
Pay attention: we didn’t have to explicitly close the channel in the source. The reason is that when linking an object Channel
to the object Task
The channel opening time will depend on the linked task. The channel closes automatically when the task is completed. You can link multiple channels to a single task, and vice versa.
Designer Task
accepts a function without arguments, while the method Channel
, which creates a channel linked to a task, accepts a function with a single argument of the type Channel
. The standard approach is to parameterize the source. In this case, you need to use a partial function to create anonymous function with no arguments or with one argument.
For objects Task
this can be done directly or using an auxiliary macro.:
function mytask(myarg)
...
end
taskHdl = Task(() -> mytask(7))
# или, что то же самое:
taskHdl = @task mytask(7)
To organize more complex work distribution patterns in combination with designers Task
and Channel
functions can be used bind
and schedule
, explicitly associating a set of channels with a set of source and consumer tasks.
Additional information about channels
The channel can be visualized as a pipeline, with writing at one end and reading at the other.:
-
Different tasks can simultaneously record to the same channel through calls
put!
. -
Different tasks can simultaneously read data through calls
take!
. -
Example:
# Given Channels c1 and c2, c1 = Channel(32) c2 = Channel(32) # и функция `foo`, которая считывает элементы из c1, обрабатывает полученный элемент # и записывает результат в c2, function foo() while true data = take!(c1) [...] # обработка данных put!(c2, result) # запись результата end end # можно запланировать к одновременному выполнению `n` экземпляров `foo`. for _ in 1:n errormonitor(@async foo()) end
-
Channels are created using the
Channel' constructor{T}(sz)
. The channel will contain only objects of typeT'. If the type is not specified, the channel can contain objects of any type. 'sz
is the maximum number of elements that a channel can contain at any given time. For example,Channel(32)
creates a channel that can hold no more than 32 objects of any type.Channel{MyType}(64)
can hold up to 64MyType
objects at any given time. -
If the object is
Channel
is empty, read operations (when calledtake!
) are blocked until the data becomes available. -
If the object is
Channel
is full, recording operations (when callingput!
) are blocked until free space becomes available. -
Method 'isready` checks for the presence of at least one object in the channel, and
wait
waits for an object to become available. -
Initially, the object
Channel
is in the open state. This means that data can be freely read from it and written to it using calls.take!
andput!
. Functionclose
closes the objectChannel
. If the object isChannel
closed, callput!
will fail. For example:julia> c = Channel(2); julia> put!(c, 1) # Вызов `put!` для открытого канала завершается успешно. 1 julia> close(c); julia> put!(c, 2) # Call `put!` causes an exception for a closed channel. ERROR: InvalidStateException: Channel is closed. Stacktrace: [...]
-
Method
take!
and the methodfetch
(which receives a value but does not delete it) for a closed channel, any available values are successfully returned until the channel becomes empty. Let’s continue with the above example.julia> fetch(c) # Любое количество вызовов`fetch` завершается успешно. 1 julia> fetch(c) 1 julia> take!(c) # При первом вызове `take!` значение удаляется. 1 julia> take!(c) # В закрытом канале больше нет данных. ERROR: InvalidStateException: Channel is closed. Stacktrace: [...]
Let’s look at a simple example of using channels to interact between tasks. Let’s run four tasks to process data from a single jobs' channel. Jobs identified by `job_id
are recorded in the channel. Each task in this example reads the job_id
, waits a random amount of time, and writes a tuple consisting of the job_id
and the timeout to the results channel. Finally, all the data from the results
channel is output.
julia> const jobs = Channel{Int}(32);
julia> const results = Channel{Tuple}(32);
julia> function do_work()
for job_id in jobs
exec_time = rand()
sleep(exec_time) # Имитирует время, затрачиваемое на реальную работу
# обычно выполняемую где-то еще.
put!(results, (job_id, exec_time))
end
end;
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> errormonitor(@async make_jobs(n)); # Подаем в канал jobs n заданий
julia> for i in 1:4 # запускает 4 задачи для параллельной обработки запросов
errormonitor(@async do_work())
end
julia> @elapsed while n > 0 # Выводим результаты
job_id, exec_time = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds")
global n = n - 1
end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311
Instead of errormonitor(t)
, a more reliable solution may be to use the bind(results, t)
function, since this will not only register all unexpected failures, but also force all associated resources to close, and exceptions will be passed on.
Additional task operations
Task operations are based on a low-level primitive yieldto
. Calling yieldto(task, value)
suspends the current task, switches to the task specified in the task
argument, and assigns the last call yieldto
for this task, return the value specified in the value
. Pay attention: yieldto
is the only operation that should use task-style execution order; instead of calling and returning control, we simply switch to another task. This is why tasks are also called "symmetric coroutines": switching to and from each task occurs using the same mechanism.
yieldto
is a very effective function, but it is rarely called directly when using tasks. Let’s look at the possible reasons. If you are switching from the current task, then you may probably need to switch back to it after some time. But in order to determine the right moment of switching, as well as the task that is responsible for such switching, strict coordination of actions is required. For example, the methods put!
and take!
block operations whose state is saved in the context of channel usage so that consumers can be identified. It is precisely the absence of the need to track the task-the consumer manually makes the method put!
is easier to use compared to a low-level function yieldto
.
Besides yieldto
, several more basic functions are required to use tasks effectively.
-
`currenttask' returns a reference to the currently running task.
-
`istaskdone' requests whether the task has completed its work.
-
istaskstarted
requests whether the task has been started. -
task_local_storage
operates a key-value pair store associated with the current task.
Tasks and events
Task switches most often occur as a result of waiting for events, such as I/O requests, and are performed by the scheduler included in the Base module in Julia. The scheduler maintains a queue of tasks that can be started, and executes an event loop that restarts tasks when external events occur, such as a message.
The basic function for waiting for an event — wait
. Function wait
implements a number of objects. For example, for the Process
object, the function wait
is waiting for its execution to complete. Function wait
is often implicit; for example, wait
can be executed inside a call read
to wait for data availability.
In all these cases, the function wait
eventually works with the object Condition
, which is responsible for queuing and restarting tasks. When a task calls a function wait
when a condition occurs Condition
, it is marked as not to be started, added to the condition queue and switched to the scheduler. After that, the scheduler selects another task to run or blocks waiting for external events. If everything is fine, the event handler eventually calls the function notify
for the condition, so that pending tasks become available to run again.