异步编程
当程序需要与外部世界交互时,例如通过因特网与另一台计算机交互时,程序中的操作可以以不可预测的顺序执行。 假设程序需要下载一个文件。 在开始下载操作之后,期望执行其它操作,直到下载完成,之后将恢复对需要下载的文件的代码的执行。 此场景属于异步编程领域,有时也称为并行(因为同时执行多个操作)。
使用"任务"对象的基本操作
任务可以表示为需要执行的某些计算工作单元的描述符。 它的生命周期包括创建、启动、执行和完成等阶段。 任务是通过为没有要执行的参数的函数调用`Task`构造函数或使用宏来创建的 '@任务':
julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0
'@task x’相当于'Task(()->x)`。
此任务等待五秒钟,然后输出"完成"。 但是,它还没有开始执行。 它可以通过调用函数随时启动 '附表`:
julia> schedule(t);
如果您尝试在REPL中执行此操作,您将看到`schedule`函数立即返回控制权。 原因是它只是将一个`t`添加到要完成的任务的内部队列中。 然后在REPL中显示以下数据输入提示。 当键盘输入挂起时,其他任务可能正在运行,因此任务’t’正在运行。 `t’调用函数 'sleep',它设置一个计时器并停止执行。 如果计划了其他任务,则此时可以执行这些任务。 五秒钟后,计时器触发并再次启动任务"t",输出"完成"。 之后,任务’t’完成。
功能 `wait'阻止导致它的任务的执行,直到完成一些其他任务。 例如,如果输入,
julia> schedule(t); wait(t)
而不仅仅是调用`schedule`,下一个输入提示将在五秒钟内出现。 原因是REPL在继续工作之前等待任务`t`完成。
通常需要创建一个任务并安排它立即执行。 为此提供了宏。 `@async':调用'@async x’相当于调用’schedule(@task x)'。
与渠道的互动
在某些情况下,工作单元与函数调用无关:要执行的任务中没有显式调用者或可调用对象。 一个例子是源-使用者方案,其中一个复杂过程生成值,另一个使用它们。 消费者不能简单地调用source函数来获取值,因为source可能还没有创建所有值,并且还没有准备好提供它们。 使用任务时,源和使用者可以根据需要运行,根据需要交换数据。
让我们定义一个通过调用方法创建值的源任务。 '放!. 为了获得这些值,有必要在一个新的任务中安排消费者的执行。 您可以使用特殊构造函数来运行链接到通道的任务。 'Channel',它采用单参数函数作为参数。 然后,您可以使用该方法重复从通道中获取值 '拿!:
julia> function producer(c::Channel)
put!(c, "start")
for n=1:4
put!(c, 2n)
end
put!(c, "stop")
end;
julia> chnl = Channel(producer);
julia> take!(chnl)
"start"
julia> take!(chnl)
2
julia> take!(chnl)
4
julia> take!(chnl)
6
julia> take!(chnl)
8
julia> take!(chnl)
"stop"
源('生产者')似乎能够重复返回控制。 通话之间 '放!'源的执行被暂停,并且消费者接收控制。
返回通道 'Channel'可以用作’for`循环中的可迭代对象。 在这种情况下,循环变量接受所有接收到的值。 通道关闭后循环结束。
julia> for x in Channel(producer)
println(x)
end
start
2
4
6
8
stop
设计师 'Task'接受没有参数的函数,而方法 `Channel',它创建一个链接到任务的通道,接受一个具有该类型的单个参数的函数 '通道'。 标准方法是参数化源。 在这种情况下,您需要使用部分函数来创建 匿名函数没有参数或有一个参数。
对于对象 `Task'这可以直接或使用辅助宏完成。:
function mytask(myarg)
...
end
taskHdl = Task(() -> mytask(7))
# или, что то же самое:
taskHdl = @task mytask(7)
结合设计师组织更复杂的工作分配模式 '任务'和 `Channel'函数可以使用 `绑定'和 `schedule',明确地将一组渠道与一组源和消费者任务相关联。
有关频道的其他信息
通道可以可视化为管道,一端写入,另一端读取。:
-
不同的任务可以通过调用同时记录到同一个频道 '放!`.
-
不同的任务可以通过调用同时读取数据 '拿!`.
-
例子::
# Given Channels c1 and c2, c1 = Channel(32) c2 = Channel(32) # и функция `foo`, которая считывает элементы из c1, обрабатывает полученный элемент # и записывает результат в c2, function foo() while true data = take!(c1) [...] # обработка данных put!(c2, result) # запись результата end end # можно запланировать к одновременному выполнению `n` экземпляров `foo`. for _ in 1:n errormonitor(@async foo()) end
-
通道是使用'Channel'构造函数创建的{T}(sz)'。 通道将只包含类型为`T’的对象。 如果未指定类型,则通道可以包含任何类型的对象。 'sz’是通道在任何给定时间可以包含的最大元素数。 例如,'Channel(32)'创建一个通道,该通道可以容纳不超过32个任何类型的对象。 '频道{MyType}(64)`在任何给定时间最多可容纳64个`MyType`对象。
-
最初,对象 'Channel'处于打开状态。 这意味着数据可以自由地从中读取并使用调用写入。 '拿!
和 '放!. 功能 `close'关闭对象 '通道'。 如果对象是 `频道'关闭,呼叫 '放!'会失败。 例如:
julia> c = Channel(2); julia> put!(c, 1) # Вызов `put!` для открытого канала завершается успешно. 1 julia> close(c); 朱莉娅>放!(c,2)#Call'put!'导致关闭通道的异常。 错误:InvalidStateException:通道已关闭。 [医]堆垛机: [...]
-
方法 '拿!'和方法 `fetch'(接收一个值但不删除它)对于一个关闭的通道,任何可用的值都被成功返回,直到通道变为空。 让我们继续上面的例子。
julia> fetch(c) # Любое количество вызовов`fetch` завершается успешно. 1 julia> fetch(c) 1 julia> take!(c) # При первом вызове `take!` значение удаляется. 1 julia> take!(c) # В закрытом канале больше нет данных. ERROR: InvalidStateException: Channel is closed. Stacktrace: [...]
让我们来看一个使用通道在任务之间进行交互的简单示例。 让我们运行四个任务来处理来自单个"作业"通道的数据。 通道包含由’job_id’标识的作业。 此示例中的每个任务读取’job_id`,等待随机的时间量,并将由`job_id’和超时组成的元组写入结果通道。 最后,输出来自"结果"通道的所有数据。
julia> const jobs = Channel{Int}(32);
julia> const results = Channel{Tuple}(32);
julia> function do_work()
for job_id in jobs
exec_time = rand()
sleep(exec_time) # Имитирует время, затрачиваемое на реальную работу
# обычно выполняемую где-то еще.
put!(results, (job_id, exec_time))
end
end;
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> errormonitor(@async make_jobs(n)); # Подаем в канал jobs n заданий
julia> for i in 1:4 # запускает 4 задачи для параллельной обработки запросов
errormonitor(@async do_work())
end
julia> @elapsed while n > 0 # Выводим результаты
job_id, exec_time = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds")
global n = n - 1
end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311
而不是’errormonitor(t)`,更可靠的解决方案可能是使用`bind(results,t)`函数,因为这不仅会注册所有意外失败,还会强制关闭所有关联的资源,并且会传递异常。
其他任务操作
任务操作基于低级原语 'yieldto'。 调用’yieldto(task,value)'会挂起当前任务,切换到`task`参数中指定的任务,并分配最后一次调用 'yieldto'对于此任务,返回’value’中指定的值。 注意了: 'yieldto'是唯一应该使用任务式执行顺序的操作;而不是调用和返回控制,我们只是切换到另一个任务。 这就是为什么任务也被称为"对称协程":切换到和从每个任务使用相同的机制发生。
'yieldto'是一个非常有效的函数,但在使用任务时很少直接调用它。 让我们来看看可能的原因。 如果您从当前任务切换,那么您可能需要在一段时间后切换回它。 但为了确定切换的正确时刻,以及负责这种切换的任务,需要严格协调动作。 例如,方法 '放!`和 '拿!'阻止其状态保存在通道使用上下文中的操作,以便可以识别消费者。 恰恰是没有必要跟踪任务-消费者手动制作方法 '放!'与低级函数相比,更容易使用 'yieldto'。
此外 'yieldto',需要几个更基本的功能才能有效地使用任务。
-
'currenttask'返回当前运行任务的引用。
-
'istaskdone'请求任务是否已完成其工作。
-
`istaskstarted'请求任务是否已启动。
-
`task_local_storage'操作与当前任务关联的键值对存储。
任务和事件
任务切换通常是由于等待事件(例如I/O请求)而发生的,并且由Julia中基本模块中包含的调度程序执行。 调度程序维护一个可以启动的任务队列,并执行一个事件循环,该事件循环在外部事件发生时重新启动任务,例如消息。
等待事件的基本功能 — '等待'。 功能 'wait'实现了许多对象。 例如,对于’Process’对象,函数 'wait'正在等待它的执行完成。 功能 'wait'通常是隐含的;例如, `wait'可以在调用内部执行 `read'等待数据可用性。
在所有这些情况下,功能 `wait'最终与对象一起工作 'Condition',它负责排队和重新启动任务。 当任务调用函数时 `wait'当条件发生时 'Condition',则标记为未启动,加入条件队列并切换到调度器。 之后,调度程序选择要运行的另一个任务或阻止等待外部事件。 如果一切正常,事件处理程序最终会调用该函数 'notify',以便挂起的任务可以再次运行。