Engee 文档
Notebook

异步编程和多线程

异步编程

通常情况下,任务会在某个线程中顺序或同步执行。如果一个线程在单核计算机上运行 任务是阻塞任务,这意味着程序必须在完成线程执行后才能继续执行其他任务。 这意味着程序必须在执行完该线程后才能继续执行其他任务。 异步任务也称协同程序,旨在通过将任务拆分为多个线程,最大限度地提高单线程内的效率 将任务分割成多个线程,并允许内核在它们之间快速切换。 这意味着任务可以启动和停止。这意味着任务可以在不过度使用系统资源的情况下启动和停止。 换句话说,协程提供了非阻塞执行。 当任务包括事件处理、生产者-消费者进程或等待 I/O 操作时,协程非常有用。 例如,等待 I/O 操作(如网络请求或文件操作)。

例如,执行某些 数据转换需要很长时间。这个过程可以用下面的 函数:

In [ ]:
function long_process()
    sleep(3) # имитация длительного процесса
    return 42.0 # возврат некоторого результата
end
Out[0]:
long_process (generic function with 1 method)

如果需要处理大量数据,这个过程就会耗时过长。例如 执行以下代码块需要 9 秒多一点,因为要依次调用 3 个函数 每个函数的执行时间约为 3 秒:

In [ ]:
@elapsed begin
    p1 = long_process() # имитация длительного преобразования данных
    p2 = long_process() # имитация длительного преобразования данных
    p3 = long_process() # имитация длительного преобразования данных
end
Out[0]:
9.005899319

上述代码块是一个典型的按顺序执行操作并导致总执行时间的情况。幸运的是,总执行时间可以缩短到最长进程的长度。 进程的长度。这可以通过使用协同程序 Task 来实现:

In [ ]:
@elapsed begin
t1 = Task(long_process); schedule(t1)
t2 = Task(long_process); schedule(t2)
t3 = Task(long_process); schedule(t3)
(fetch(t1), fetch(t2), fetch(t3))
end
Out[0]:
3.012623215

同样的结果只需 3 秒钟即可实现。每个 long_process 作为一个单独的工作单元或任务运行。这种分离允许处理器在执行过程中在不同任务间切换,并同时在多个任务上取得进展。 在执行过程中切换任务,并同时在多个任务上取得进展。

image_6.png

您可以使用 @task 宏来达到同样的效果:

In [ ]:
@elapsed begin
t1 = @task long_process(); schedule(t1)
t2 = @task long_process(); schedule(t2)
t3 = @task long_process(); schedule(t3)
(fetch(t1), fetch(t2), fetch(t3))
end
Out[0]:
3.017859429

多线程

在前面的例子中,每个任务都是在单线程中与其他任务并行执行的。从严格意义上讲,这种并行不是并行。异步任务可能非常有用,但有时你需要真正的并行。Julia 允许你 在多个线程中同时调度任务。 可以使用Threads.@spawn宏来覆盖long_process函数。为每个进程 创建一个任务,并在任务可用时将其调度到任意线程中:

In [ ]:
@elapsed begin
s1 = Threads.@spawn long_process()
s2 = Threads.@spawn long_process()
s3 = Threads.@spawn long_process()
(fetch(s1), fetch(s2), fetch(s3))
end
Out[0]:
3.027746895

为确保 Threads.@spawn 能创建和调度任务,可以检查返回类型:

In [ ]:
Threads.@spawn sleep(1)
Out[0]:
Task (runnable) @0x00007f96aad98330

使用Threads.@threads宏可以简化在多个可用线程中执行任务的过程。要执行多线程for循环,只需在循环中添加@threads前缀。 这样产生的代码块执行时间约为 3 秒,因为每个 长进程都会作为新任务启动,并分配给一个可用的线程(如使用@spawn的示例)。

In [ ]:
Threads.@threads for _ in 1:3
    result = long_process()
end

@threads宏简化了可用线程中的任务调度,但如果没有特殊的数据输出结构--Channel,就无法获得任务执行的结果。创建通道时,可以指定其类型和大小(输入的数量已声明)。例如,Channel{String}(12) 创建一个可包含 12 个字符串的通道。 通道{String}(12)一次可包含 12 个字符串元素。如果省略类型,通道 将允许任意**类型的数据输入。让我们创建一个不指定数据类型的通道:

In [ ]:
ch = Channel(32)
Out[0]:
Channel{Any}(32) (empty)

一个空通道(empty)已经创建。现在,让我们使用 put 向其中写入数据!

In [ ]:
Threads.@threads for _ in 1:3
    result = long_process()
    put!(ch, result) # запись результата выполнения задачи с помощью put! в канал ch
end
ch
Out[0]:
Channel{Any}(32) (3 items available)

执行long_process函数的 3 个结果已写入通道,为了显示其中的第一个结果,我们需要使用fetch函数:

In [ ]:
fetch(ch)
Out[0]:
42.0

为了获取通道中的下一个元素,我们将使用take!函数:

In [ ]:
take!(ch)
Out[0]:
42.0

该函数既能从通道中提取元素,也能从通道中删除元素。

输出:

该示例演示了异步任务和多线程工作的基础知识,以及使用通道输出数据。