异步编程
当程序需要与外部世界进行交互时,例如通过互联网与另一台机器进行通信时,程序中的操作可能需要以不可预测的顺序发生。 假设你的程序需要下载一个文件。 我们希望启动下载操作,在等待它完成的同时执行其他操作,然后在可用时恢复需要下载文件的代码。 这种情况属于异步编程领域,有时也称为并发编程(因为从概念上讲,多个事情同时发生)。
基本 任务 运作
你可以想到一个 任务 作为要执行的计算工作单元的句柄。 它有一个create-start-run-finish生命周期。 任务是通过调用 任务 要运行的0参数函数的构造函数,或使用 @任务宏:
julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0
@任务x 相当于 任务(()->x).
此任务将等待五秒钟,然后打印 完成了. 但是,它还没有开始运行。 我们可以随时打电话来运行它 时间表:
julia> schedule(t);
如果你在REPL中尝试这个,你会看到 时间表 立即返回。 那是因为它只是增加了 t 到要运行的任务的内部队列。 然后,REPL将打印下一个提示并等待更多输入。 等待键盘输入提供了其他任务运行的机会,因此在这一点上 t 将开始。 t 电话 睡觉,它设置一个计时器并停止执行。 如果计划了其他任务,则可以运行这些任务。 五秒钟后,计时器会触发并重新启动 t,你会看到 完成了 印刷出来的。 t 然后完成。
该 等等!函数阻塞调用任务,直到其他任务完成。 因此,例如,如果您键入
julia> schedule(t); wait(t)
而不是只打电话 时间表,在下一个输入提示出现之前,您将看到五秒钟的暂停。 那是因为REPL正在等待 t 在继续之前完成。
想要创建一个任务并立即计划它是很常见的,所以宏 线程。@产卵是为此目的而提供的 --- 线程。@产卵x 相当于 任务=@任务x;任务。sticky=false;schedule(任务).
与渠道沟通
在某些问题中,所需工作的各个部分不是通过函数调用自然相关的;在需要完成的工作中没有明显的"调用者"或"被调用者"。 一个例子是生产者-消费者问题,其中一个复杂的过程正在生成值,而另一个复杂的过程正在消耗它们。 消费者不能简单地调用生产者函数来获取值,因为生产者可能有更多的值要生成,因此可能还没有准备好返回。 对于任务,生产者和消费者都可以根据需要运行,并根据需要来回传递值。
让我们定义一个生产者任务,它通过 放!呼叫。 为了使用值,我们需要安排生产者在一个新的任务中运行。 特别的 频道接受1-arg函数作为参数的构造函数可用于运行绑定到通道的任务。 然后我们可以 拿!从通道对象重复的值:
julia> function producer(c::Channel)
put!(c, "start")
for n=1:4
put!(c, 2n)
end
put!(c, "stop")
end;
julia> chnl = Channel(producer);
julia> take!(chnl)
"start"
julia> take!(chnl)
2
julia> take!(chnl)
4
julia> take!(chnl)
6
julia> take!(chnl)
8
julia> take!(chnl)
"stop"
考虑这种行为的一种方法是 生产商 能够多次返回。 在调用之间 放!,生产者的执行暂停,消费者拥有控制权。
返回的 频道可以用作一个可迭代的对象 为 循环,在这种情况下,循环变量接受所有产生的值. 当通道关闭时,环路终止。
julia> for x in Channel(producer)
println(x)
end
start
2
4
6
8
stop
请注意,我们不必在producer中显式关闭通道。 这是因为绑定a的行为 频道到a 任务将通道的开放生存期与绑定任务的开放生存期相关联。 当任务终止时,通道对象会自动关闭。 多个通道可以绑定到一个任务,反之亦然。
而 任务 构造函数需要一个0参数函数, xref:base/parallel.adoc#Base.Channel[频道创建任务绑定通道的方法需要接受类型的单个参数的函数 频道. 一个常见的模式是生产者被参数化,在这种情况下,需要一个部分函数应用程序来创建一个0或1参数 匿名功能。
为 任务对象这可以直接完成,也可以通过使用方便的宏来完成:
function mytask(myarg)
...
end
taskHdl = Task(() -> mytask(7))
# or, equivalently
taskHdl = @task mytask(7)
更多有关频道的资料
一个通道可以被可视化为一个管道,即它有一个写端和一个读端 :
*不同任务中的多个写入器可以通过 xref:base/parallel.adoc#Base.put!-Tuple{Channel,%20Any}[放! 电话。
*不同任务中的多个读取器可以通过 xref:base/io-network.adoc#Base.take!-Tuple{Base.GenericIOBuffer}[拿! 电话。
*作为一个例子:
+
# Given Channels c1 and c2,
c1 = Channel(32)
c2 = Channel(32)
# and a function `foo` which reads items from c1, processes the item read
# and writes a result to c2,
function foo()
while true
data = take!(c1)
[...] # process data
put!(c2, result) # write out result
end
end
# we can schedule `n` instances of `foo` to be active concurrently.
for _ in 1:n
errormonitor(Threads.@spawn foo())
end
*频道是通过 频道{T}(sz) 构造函数。 通道将只保存类型的对象 T. 如果未指定类型,则通道可以保存任何类型的对象。 sz 指在任何时候可以保持在通道中的元素的最大数量。 例如, 频道(32) 创建一个最多可容纳32个任何类型对象的通道。 A 频道{MyType}(64) 最多可容纳64个物体 [医]类型 在任何时候。
*如果 频道是空的,读者(在一个 拿!调用)将阻塞,直到数据可用(见 [医空位]).
*如果 频道)满了,作家们 xref:base/parallel.adoc#Base.put!-Tuple{Channel,%20Any}[放!调用)将阻止,直到空间变得可用(见 伊斯富尔).
* 已经准备好了测试通道中是否存在任何对象,而 等等!等待对象变为可用。
*请注意,如果另一个任务当前正在等待 放! 一个对象进入一个通道,一个通道可以有比它的容量更多的可用项.
*一个 频道初始处于打开状态。 这意味着它可以通过 拿!和 放! 电话。 xref:base/io-network.adoc#Base.close[接近/接近关闭a 频道. 在关闭 频道, 放!会失败。 例如:
+
julia> c = Channel(2);
julia> put!(c, 1) # `put!` on an open channel succeeds
1
julia> close(c);
julia> put!(c, 2) # `put!` on a closed channel throws an exception.
ERROR: InvalidStateException: Channel is closed.
Stacktrace:
[...]
+
julia> fetch(c) # Any number of `fetch` calls succeed.
1
julia> fetch(c)
1
julia> take!(c) # The first `take!` removes the value.
1
julia> take!(c) # No more data available on a closed channel.
ERROR: InvalidStateException: Channel is closed.
Stacktrace:
[...]
考虑一个使用通道进行任务间通信的简单示例。 我们开始4个任务来处理数据从一个单一的 工作机会 频道。 作业,由id标识(工作_id),都写入通道。 在这个模拟中的每个任务读取一个 工作_id,等待一个随机的时间量,并写回一个元组 工作_id 和模拟到结果通道的时间。 最后所有的 结果 被打印出来。
julia> const jobs = Channel{Int}(32);
julia> const results = Channel{Tuple}(32);
julia> function do_work()
for job_id in jobs
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
# typically performed externally.
put!(results, (job_id, exec_time))
end
end;
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> errormonitor(Threads.@spawn make_jobs(n)); # feed the jobs channel with "n" jobs
julia> for i in 1:4 # start 4 tasks to process requests in parallel
errormonitor(Threads.@spawn do_work())
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds")
global n = n - 1
end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311
而不是 勘误者(t),更健壮的解决方案可能是使用 绑定(结果,t),因为这不仅会记录任何意外的故障,而且还会强制关联的资源关闭并将异常传播到任何地方。
更多任务操作
任务操作是建立在一个叫做的低级原语上的 耶尔德托. yieldto(任务,价值) 挂起当前任务,切换到指定的 任务,并导致该任务的最后 耶尔德托调用返回指定的 价值. 请注意 耶尔德托是使用任务式控制流所需的唯一操作;而不是调用和返回,我们总是只是切换到不同的任务。 这就是为什么这个特性也被称为"对称协程";每个任务都被切换到使用相同的机制。
耶尔德托功能强大,但大多数任务的使用都不会直接调用它。 考虑一下为什么会这样。 如果您切换离开当前任务,您可能会希望在某个时候切换回它,但是知道何时切换回来,并且知道哪个任务有责任切换回来,可能需要相当大的协调。 例如, 放!和 拿!是阻塞操作,当在通道的上下文中使用时,保持状态以记住消费者是谁。 不需要手动跟踪消费任务是什么 放!比低级更容易使用 耶尔德托.
除了 耶尔德托,需要一些其他基本功能来有效地使用任务。
* 当前任务获取当前正在运行的任务的引用。
* 伊斯塔斯克多内查询任务是否已退出。
* 开始了查询任务是否已运行。
* 任务_local_storage操作特定于当前任务的键值存储。
任务和事件
大多数任务切换是由于等待i/O请求等事件而发生的,并且由Julia Base中包含的调度程序执行。 调度程序维护一个可运行任务的队列,并执行一个事件循环,该事件循环根据外部事件(如消息到达)重新启动任务。
在所有这些情况下, 等等!最终在一个 条件对象,负责排队和重新启动任务。 当任务调用时 等等!在一个 条件,任务标记为不可运行,添加到条件的队列中,并切换到调度程序。 然后,调度程序将选择另一个任务来运行,或阻止等待外部事件。 如果一切顺利,最终会调用一个事件处理程序 通知条件,这会导致等待该条件的任务再次变为可运行。
通过调用显式创建的任务 任务最初不为调度器所知。 这允许您使用以下方法手动管理任务 xref:base/parallel.adoc#Base.yieldto[耶尔德托 如果你愿意的话。 但是,当这样的任务等待事件时,它仍然会在事件发生时自动重新启动,正如您所期望的那样。