Engee documentation
Notebook

Asynchronous programming and multithreading

Asynchronous programming

As a rule, tasks are executed sequentially or synchronously in a certain thread. If the thread is running
on a single-core computer, and the task is blocking, it means that the program must complete
the thread before proceeding to other tasks.
Asynchronous tasks, also known as coroutines, aim to maximize efficiency within a single thread
by dividing tasks into multiple threads and allowing the kernel to quickly switch between
them. This means that tasks can be started and stopped without using up system resources unnecessarily.
In other words, coroutines provide non-blocking execution.
Coroutines are useful when tasks include event handling, producer-consumer processes, or
waiting for I/O operations such as network requests or file operations.

For example, performing some
data transformation takes a long time. This process is described by the following
function:

In [ ]:
function long_process()
    sleep(3) # имитация длительного процесса
    return 42.0 # возврат некоторого результата
end
Out[0]:
long_process (generic function with 1 method)

This process will take too long if you need to process a large amount of data. For example, the execution
of the following code block takes just over 9 seconds, since 3 functions are called sequentially
, each of which runs for about 3 seconds.:

In [ ]:
@elapsed begin
    p1 = long_process() # имитация длительного преобразования данных
    p2 = long_process() # имитация длительного преобразования данных
    p3 = long_process() # имитация длительного преобразования данных
end
Out[0]:
9.005899319

The above code block represents a typical scenario where actions are performed sequentially and result in a cumulative execution time. Fortunately, the total execution time can be reduced to the length of the longest
process. This can be achieved using a coroutine, Task:

In [ ]:
@elapsed begin
t1 = Task(long_process); schedule(t1)
t2 = Task(long_process); schedule(t2)
t3 = Task(long_process); schedule(t3)
(fetch(t1), fetch(t2), fetch(t3))
end
Out[0]:
3.012623215

The same result was achieved in just 3 seconds. Each long_process was started
as a separate work unit, or task. This separation allows the processor to switch between tasks
during execution and make progress on more than one task at a time.

image_6.png

To achieve the same results, you can use the macro @task:

In [ ]:
@elapsed begin
t1 = @task long_process(); schedule(t1)
t2 = @task long_process(); schedule(t2)
t3 = @task long_process(); schedule(t3)
(fetch(t1), fetch(t2), fetch(t3))
end
Out[0]:
3.017859429

Multithreading

In the previous examples, each task was executed in parallel with the others in the same thread. In a strict sense, such parallelism is not parallelism. Asynchronous tasks can be very useful, but sometimes real parallelism is required. Julia allows
you to schedule tasks in many threads at once.
The Threads macro.@spawn can be used to overwrite the long_process function. For each process
, create a task and schedule its execution in any thread as soon as it becomes available.:

In [ ]:
@elapsed begin
s1 = Threads.@spawn long_process()
s2 = Threads.@spawn long_process()
s3 = Threads.@spawn long_process()
(fetch(s1), fetch(s2), fetch(s3))
end
Out[0]:
3.027746895

To make sure that Threads.@spawn creates and schedules tasks, you can check the return type:

In [ ]:
Threads.@spawn sleep(1)
Out[0]:
Task (runnable) @0x00007f96aad98330

Completing tasks in multiple available threads can be simplified by using the Threads macro.@threads. To perform a multithreaded for loop, simply add the prefix @threads to the loop.
The execution of the resulting code block takes ~3 seconds, because each
long_process starts as a new task and is distributed across the available thread (as in the example where @spawn is used).

In [ ]:
Threads.@threads for _ in 1:3
    result = long_process()
end

The macro @threads simplifies the scheduling of tasks in available threads, but the result of the task cannot be obtained without a special design for data output - Channel (channel). When creating a channel, you can specify its type and size (the number of inputs is announced). For example, Channel12 creates
a channel that can contain 12 string elements simultaneously. If the type is omitted, the channel
will allow input of the Any type. Creating a channel without specifying the data type:

In [ ]:
ch = Channel(32)
Out[0]:
Channel{Any}(32) (empty)

An empty channel has been created. Now let's write data to it using the put function!:

In [ ]:
Threads.@threads for _ in 1:3
    result = long_process()
    put!(ch, result) # запись результата выполнения задачи с помощью put! в канал ch
end
ch
Out[0]:
Channel{Any}(32) (3 items available)

3 results of the long_process function were written to the channel. In order to display the first of them, you need to use the fetch function.:

In [ ]:
fetch(ch)
Out[0]:
42.0

To extract the next channel element, use the take! function.:

In [ ]:
take!(ch)
Out[0]:
42.0

This function both extracts and deletes an element from a channel.

Conclusion:

This example demonstrated the basics of working with asynchronous tasks and multithreading, as well as data output using channels.