Engee documentation
Notebook

Asynchronous programming and multithreading

Asynchronous programming

Typically, tasks are executed sequentially or synchronously in some thread. If a thread is running on a single-core computer, and the task is a blocking task, it means that the programme must finish execution of the thread before moving on to other tasks. Asynchronous tasks, also known as coprograms, aim to maximise efficiency within a single thread by splitting tasks into multiple threads and allowing the kernel to quickly switch between them. between them. This means that tasks can start and stop without excessive use of system resources. In other words, coprograms provide non-blocking execution. Coprograms are useful when tasks include event handling, producer-consumer processes, or waiting for I/O operations. waiting for I/O operations such as network requests or file operations.

For example, the execution of some data transformation takes a long time. This process is described by the following function:

In [ ]:
function long_process()
    sleep(3) # имитация длительного процесса
    return 42.0 # возврат некоторого результата
end
Out[0]:
long_process (generic function with 1 method)

Such a process will take too long if a large amount of data needs to be processed. For example, the execution of the following code block takes a little more than 9 seconds, because 3 functions are called sequentially each of which takes approximately 3 seconds to execute:

In [ ]:
@elapsed begin
    p1 = long_process() # имитация длительного преобразования данных
    p2 = long_process() # имитация длительного преобразования данных
    p3 = long_process() # имитация длительного преобразования данных
end
Out[0]:
9.005899319

The above code block is a typical scenario where actions are executed sequentially and result in total execution time. Fortunately, the total execution time can be reduced to the length of the longest process. This can be achieved by using the coprogram, Task:

In [ ]:
@elapsed begin
t1 = Task(long_process); schedule(t1)
t2 = Task(long_process); schedule(t2)
t3 = Task(long_process); schedule(t3)
(fetch(t1), fetch(t2), fetch(t3))
end
Out[0]:
3.012623215

The same result was achieved in only 3 seconds. Each long_process was run as a separate work unit, or task. This separation allows the processor to switch between tasks during execution and make progress on more than one task at a time.

image_6.png

You can use the @task macro to achieve the same results:

In [ ]:
@elapsed begin
t1 = @task long_process(); schedule(t1)
t2 = @task long_process(); schedule(t2)
t3 = @task long_process(); schedule(t3)
(fetch(t1), fetch(t2), fetch(t3))
end
Out[0]:
3.017859429

Multithreading

In the previous examples, each task was executed in parallel with others in a single thread. In a strict sense, such parallelism is not parallelism. Asynchronous tasks can be very useful, but sometimes you need real parallelism. Julia allows you to to schedule tasks in many threads at once. The Threads.@spawn macro can be used to overwrite the long_process function. For each process create a task and schedule it in any thread as soon as it becomes available:

In [ ]:
@elapsed begin
s1 = Threads.@spawn long_process()
s2 = Threads.@spawn long_process()
s3 = Threads.@spawn long_process()
(fetch(s1), fetch(s2), fetch(s3))
end
Out[0]:
3.027746895

To ensure that Threads.@spawn creates and schedules tasks, you can check the return type:

In [ ]:
Threads.@spawn sleep(1)
Out[0]:
Task (runnable) @0x00007f96aad98330

Executing tasks in multiple available threads can be simplified by using the Threads.@threads macro. To execute a multithreaded for loop, simply add the prefix @threads to the loop. The resulting block of code takes ~3 seconds to execute because each long_process is started as a new task and allocated to an available thread (as in the example where @spawn is used).

In [ ]:
Threads.@threads for _ in 1:3
    result = long_process()
end

The @threads macro simplifies scheduling tasks in available threads, but you can't get the result of the task execution without a special construct for data output - Channel. When creating a channel, you can specify its type and size (the number of inputs is declared). For example, Channel{String}(12) creates a channel that can contain 12 strings. a channel that can contain 12 string elements at a time. If the type is omitted, the channel will allow data input of type Any. Let's create a channel without specifying the data type:

In [ ]:
ch = Channel(32)
Out[0]:
Channel{Any}(32) (empty)

An empty channel (empty) has been created. Now let's write data into it using the put!

In [ ]:
Threads.@threads for _ in 1:3
    result = long_process()
    put!(ch, result) # запись результата выполнения задачи с помощью put! в канал ch
end
ch
Out[0]:
Channel{Any}(32) (3 items available)

3 results of long_process function execution were written to the channel, in order to display the first of them we need to use fetch function:

In [ ]:
fetch(ch)
Out[0]:
42.0

To fetch the next element of the channel we will use the take! function:

In [ ]:
take!(ch)
Out[0]:
42.0

This function both extracts and removes an element from the channel.

Conclusion:

This example demonstrated the basics of working with asynchronous tasks and multithreading, as well as data output using channels.