Engee documentation

Asynchronous programming

When a program needs to interact with the outside world, for example with another computer via the Internet, operations in the program can be performed in an unpredictable sequence. Let’s say the program needs to download a file. After starting the download operation, it is desirable that other operations are performed until the download is completed, after which the execution of the code that needs the downloaded file resumes. This scenario belongs to the field of asynchronous programming, which is also sometimes called parallel (since several operations are performed simultaneously).

Julia has facilities for such situations. Task — tasks (they can be called in other ways: symmetric coroutines, lightweight threads, collaborative multitasking, or one-time continuations). When any computational work (in practice, it is usually a specific function) is performed as Task, it becomes possible to interrupt it by switching to another object Task. The original task Task can be resumed later, and its execution will continue exactly from the place where it was interrupted. At first glance, it might seem like a function call. However, there are two main differences. Firstly, no space is used when switching tasks, so you can switch tasks as often as you want without using up the call stack. Secondly, switching between tasks can occur in any order, while when functions are called, control is returned to the calling function only after the execution of the called one is completed.

Basic operations with Task objects

A task can be represented as a descriptor for some unit of computational work that needs to be performed. Its lifecycle consists of the stages of creation, launch, execution, and completion. Tasks are created by calling the Task constructor for a function without arguments to be executed, or using a macro @task:

julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0

'@task x` is equivalent to Task(()->x).

This task waits five seconds and then outputs done'. However, it has not started executing yet. It can be started at any time by calling the function `schedule:

julia> schedule(t);

If you try to do this in the REPL, you will see that the schedule function immediately returns control. The reason is that it just adds a t to the internal queue of tasks to be completed. The following data entry prompt is then displayed in the REPL. While keyboard input is pending, other tasks may be running, so task t is running. 't` calls the function sleep, which sets a timer and stops execution. If other tasks are scheduled, they can be performed at this time. After five seconds, the timer fires and starts task t again, which outputs done'. After that, task `t is completed.

Function 'wait` blocks the execution of the task that caused it until some other task is completed. For example, if you enter,

julia> schedule(t); wait(t)

instead of just calling schedule, the next input prompt will appear in five seconds. The reason is that the REPL waits for task t to finish before continuing to work.

It is often necessary to create a task and schedule it for execution immediately. A macro is provided for this. '@async`: Calling @async x is equivalent to calling `schedule(@task x)'.

Interaction with channels

In some situations, the units of work are not related to function calls: there are no explicit callers or callable objects among the tasks to be performed. An example is the source-consumer scenario, where one complex procedure generates values and another uses them. The consumer cannot simply call the source function to get the value, because the source may not have created all the values yet and is not ready to provide them. When using tasks, the source and consumer can run as long as needed, exchanging data as needed.

Julia has a channel mechanism to solve this problem (Channel). Channel is a waiting queue that operates on the principle of "first in, first out": multiple tasks can write data to it or read data from it at once.

Let’s define a source task that creates values by calling a method. put!. To get the values, it is necessary to schedule the execution of the consumer in a new task. You can use a special constructor to run a task linked to a channel. Channel, which takes a single-argument function as an argument. Then you can repeatedly take values from the channel using the method take!:

julia> function producer(c::Channel)
           put!(c, "start")
           for n=1:4
               put!(c, 2n)
           end
           put!(c, "stop")
       end;

julia> chnl = Channel(producer);

julia> take!(chnl)
"start"

julia> take!(chnl)
2

julia> take!(chnl)
4

julia> take!(chnl)
6

julia> take!(chnl)
8

julia> take!(chnl)
"stop"

The source (producer') seems to be able to return control repeatedly. Between calls `put! the execution of the source is suspended, and the consumer receives control.

Return channel Channel can be used as an iterable object in the 'for` loop. In this case, the loop variable accepts all received values. The cycle ends after the channel is closed.

julia> for x in Channel(producer)
           println(x)
       end
start
2
4
6
8
stop

Pay attention: we didn’t have to explicitly close the channel in the source. The reason is that when linking an object Channel to the object Task The channel opening time will depend on the linked task. The channel closes automatically when the task is completed. You can link multiple channels to a single task, and vice versa.

Designer Task accepts a function without arguments, while the method Channel, which creates a channel linked to a task, accepts a function with a single argument of the type Channel. The standard approach is to parameterize the source. In this case, you need to use a partial function to create anonymous function with no arguments or with one argument.

For objects Task this can be done directly or using an auxiliary macro.:

function mytask(myarg)
    ...
end

taskHdl = Task(() -> mytask(7))
# или, что то же самое:
taskHdl = @task mytask(7)

To organize more complex work distribution patterns in combination with designers Task and Channel functions can be used bind and schedule, explicitly associating a set of channels with a set of source and consumer tasks.

Additional information about channels

The channel can be visualized as a pipeline, with writing at one end and reading at the other.:

  • Different tasks can simultaneously record to the same channel through calls put!.

  • Different tasks can simultaneously read data through calls take!.

  • Example:

    # Given Channels c1 and c2,
    
    c1 = Channel(32)
    c2 = Channel(32)
    # и функция `foo`, которая считывает элементы из c1, обрабатывает полученный элемент
    # и записывает результат в c2,
    function foo()
        while true
            data = take!(c1)
            [...]               # обработка данных
            put!(c2, result)    # запись результата
        end
    end
    
    # можно запланировать к одновременному выполнению `n` экземпляров `foo`.
    for _ in 1:n
        errormonitor(@async foo())
    end
  • Channels are created using the Channel' constructor{T}(sz). The channel will contain only objects of type T'. If the type is not specified, the channel can contain objects of any type. 'sz is the maximum number of elements that a channel can contain at any given time. For example, Channel(32) creates a channel that can hold no more than 32 objects of any type. Channel{MyType}(64) can hold up to 64 MyType objects at any given time.

  • If the object is Channel is empty, read operations (when called take!) are blocked until the data becomes available.

  • If the object is Channel is full, recording operations (when calling put!) are blocked until free space becomes available.

  • Method 'isready` checks for the presence of at least one object in the channel, and wait waits for an object to become available.

  • Initially, the object Channel is in the open state. This means that data can be freely read from it and written to it using calls. take! and put!. Function close closes the object Channel. If the object is Channel closed, call put! will fail. For example:

    julia> c = Channel(2);
    
    julia> put!(c, 1) # Вызов `put!` для открытого канала завершается успешно.
    1
    
    julia> close(c);
    
    julia> put!(c, 2) # Call `put!` causes an exception for a closed channel.
    ERROR: InvalidStateException: Channel is closed.
    Stacktrace:
    [...]
  • Method take! and the method fetch (which receives a value but does not delete it) for a closed channel, any available values are successfully returned until the channel becomes empty. Let’s continue with the above example.

    julia> fetch(c) # Любое количество вызовов`fetch` завершается успешно.
    1
    
    julia> fetch(c)
    1
    
    julia> take!(c) # При первом вызове `take!` значение удаляется. 1
    
    julia> take!(c) # В закрытом канале больше нет данных.
    ERROR: InvalidStateException: Channel is closed.
    Stacktrace:
    [...]

Let’s look at a simple example of using channels to interact between tasks. Let’s run four tasks to process data from a single jobs' channel. Jobs identified by `job_id are recorded in the channel. Each task in this example reads the job_id, waits a random amount of time, and writes a tuple consisting of the job_id and the timeout to the results channel. Finally, all the data from the results channel is output.

julia> const jobs = Channel{Int}(32);

julia> const results = Channel{Tuple}(32);

julia> function do_work()
           for job_id in jobs
               exec_time = rand()
               sleep(exec_time)                # Имитирует время, затрачиваемое на реальную работу
                                               # обычно выполняемую где-то еще.
               put!(results, (job_id, exec_time))
           end
       end;

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> errormonitor(@async make_jobs(n)); # Подаем в канал jobs n заданий

julia> for i in 1:4 # запускает 4 задачи для параллельной обработки запросов
           errormonitor(@async do_work())
       end

julia> @elapsed while n > 0 # Выводим результаты
           job_id, exec_time = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds")
           global n = n - 1
       end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311

Instead of errormonitor(t), a more reliable solution may be to use the bind(results, t) function, since this will not only register all unexpected failures, but also force all associated resources to close, and exceptions will be passed on.

Additional task operations

Task operations are based on a low-level primitive yieldto. Calling yieldto(task, value) suspends the current task, switches to the task specified in the task argument, and assigns the last call yieldto for this task, return the value specified in the value. Pay attention: yieldto is the only operation that should use task-style execution order; instead of calling and returning control, we simply switch to another task. This is why tasks are also called "symmetric coroutines": switching to and from each task occurs using the same mechanism.

yieldto is a very effective function, but it is rarely called directly when using tasks. Let’s look at the possible reasons. If you are switching from the current task, then you may probably need to switch back to it after some time. But in order to determine the right moment of switching, as well as the task that is responsible for such switching, strict coordination of actions is required. For example, the methods put! and take! block operations whose state is saved in the context of channel usage so that consumers can be identified. It is precisely the absence of the need to track the task-the consumer manually makes the method put! is easier to use compared to a low-level function yieldto.

Besides yieldto, several more basic functions are required to use tasks effectively.

Tasks and events

Task switches most often occur as a result of waiting for events, such as I/O requests, and are performed by the scheduler included in the Base module in Julia. The scheduler maintains a queue of tasks that can be started, and executes an event loop that restarts tasks when external events occur, such as a message.

The basic function for waiting for an event — wait. Function wait implements a number of objects. For example, for the Process object, the function wait is waiting for its execution to complete. Function wait is often implicit; for example, wait can be executed inside a call read to wait for data availability.

In all these cases, the function wait eventually works with the object Condition, which is responsible for queuing and restarting tasks. When a task calls a function wait when a condition occurs Condition, it is marked as not to be started, added to the condition queue and switched to the scheduler. After that, the scheduler selects another task to run or blocks waiting for external events. If everything is fine, the event handler eventually calls the function notify for the condition, so that pending tasks become available to run again.

A task created explicitly by calling Task, initially unknown to the scheduler. This allows you to manage tasks manually, if necessary, using yieldto. However, when such a task is waiting for an event, it still restarts automatically when it occurs, as one would expect.