Engee 文档

异步编程

当程序需要与外部世界交互时,例如通过因特网与另一台计算机交互时,程序中的操作可以以不可预测的顺序执行。 假设程序需要下载一个文件。 在开始下载操作之后,期望执行其它操作,直到下载完成,之后将恢复对需要下载的文件的代码的执行。 此场景属于异步编程领域,有时也称为并行(因为同时执行多个操作)。

朱莉娅有这种情况的设施。 `Task'--任务(它们可以通过其他方式调用:对称协程,轻量级线程,协作多任务或一次性连续)。 当任何计算工作(在实践中,它通常是一个特定的功能)被执行为 'Task',通过切换到另一个对象来中断它成为可能 '任务'。 原来的任务 `Task'可以在以后恢复,它的执行将完全从中断的地方继续。 乍一看,它可能看起来像一个函数调用。 但是,有两个主要区别。 首先,切换任务时不使用空间,因此您可以随意切换任务,而不会占用调用堆栈。 其次,任务之间的切换可以以任何顺序发生,而当函数被调用时,只有在被调用函数的执行完成后,控制才返回给调用函数。

使用"任务"对象的基本操作

任务可以表示为需要执行的某些计算工作单元的描述符。 它的生命周期包括创建、启动、执行和完成等阶段。 任务是通过为没有要执行的参数的函数调用`Task`构造函数或使用宏来创建的 '@任务':

julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0

'@task x’相当于'Task(()->x)`。

此任务等待五秒钟,然后输出"完成"。 但是,它还没有开始执行。 它可以通过调用函数随时启动 '附表`:

julia> schedule(t);

如果您尝试在REPL中执行此操作,您将看到`schedule`函数立即返回控制权。 原因是它只是将一个`t`添加到要完成的任务的内部队列中。 然后在REPL中显示以下数据输入提示。 当键盘输入挂起时,其他任务可能正在运行,因此任务’t’正在运行。 `t’调用函数 'sleep',它设置一个计时器并停止执行。 如果计划了其他任务,则此时可以执行这些任务。 五秒钟后,计时器触发并再次启动任务"t",输出"完成"。 之后,任务’t’完成。

功能 `wait'阻止导致它的任务的执行,直到完成一些其他任务。 例如,如果输入,

julia> schedule(t); wait(t)

而不仅仅是调用`schedule`,下一个输入提示将在五秒钟内出现。 原因是REPL在继续工作之前等待任务`t`完成。

通常需要创建一个任务并安排它立即执行。 为此提供了宏。 `@async':调用'@async x’相当于调用’schedule(@task x)'。

与渠道的互动

在某些情况下,工作单元与函数调用无关:要执行的任务中没有显式调用者或可调用对象。 一个例子是源-使用者方案,其中一个复杂过程生成值,另一个使用它们。 消费者不能简单地调用source函数来获取值,因为source可能还没有创建所有值,并且还没有准备好提供它们。 使用任务时,源和使用者可以根据需要运行,根据需要交换数据。

Julia有一个通道机制来解决这个问题('频道')。 'Channel'是一个以"先进先出"原则运作的等待队列:多个任务可以一次向其中写入数据或从中读取数据。

让我们定义一个通过调用方法创建值的源任务。 '放!. 为了获得这些值,有必要在一个新的任务中安排消费者的执行。 您可以使用特殊构造函数来运行链接到通道的任务。 'Channel',它采用单参数函数作为参数。 然后,您可以使用该方法重复从通道中获取值 '拿!:

julia> function producer(c::Channel)
           put!(c, "start")
           for n=1:4
               put!(c, 2n)
           end
           put!(c, "stop")
       end;

julia> chnl = Channel(producer);

julia> take!(chnl)
"start"

julia> take!(chnl)
2

julia> take!(chnl)
4

julia> take!(chnl)
6

julia> take!(chnl)
8

julia> take!(chnl)
"stop"

源('生产者')似乎能够重复返回控制。 通话之间 '放!'源的执行被暂停,并且消费者接收控制。

返回通道 'Channel'可以用作’for`循环中的可迭代对象。 在这种情况下,循环变量接受所有接收到的值。 通道关闭后循环结束。

julia> for x in Channel(producer)
           println(x)
       end
start
2
4
6
8
stop

注意:我们不必在源中明确关闭通道。 原因是链接对象时 `通道'到对象 `任务'通道打开时间将取决于链接的任务。 通道在任务完成时自动关闭。 您可以将多个通道链接到单个任务,反之亦然。

设计师 'Task'接受没有参数的函数,而方法 `Channel',它创建一个链接到任务的通道,接受一个具有该类型的单个参数的函数 '通道'。 标准方法是参数化源。 在这种情况下,您需要使用部分函数来创建 匿名函数没有参数或有一个参数。

对于对象 `Task'这可以直接或使用辅助宏完成。:

function mytask(myarg)
    ...
end

taskHdl = Task(() -> mytask(7))
# или, что то же самое:
taskHdl = @task mytask(7)

结合设计师组织更复杂的工作分配模式 '任务'`Channel'函数可以使用 `绑定'`schedule',明确地将一组渠道与一组源和消费者任务相关联。

有关频道的其他信息

通道可以可视化为管道,一端写入,另一端读取。:

  • 不同的任务可以通过调用同时记录到同一个频道 '放!`.

  • 不同的任务可以通过调用同时读取数据 '拿!`.

  • 例子::

    # Given Channels c1 and c2,
    
    c1 = Channel(32)
    c2 = Channel(32)
    # и функция `foo`, которая считывает элементы из c1, обрабатывает полученный элемент
    # и записывает результат в c2,
    function foo()
        while true
            data = take!(c1)
            [...]               # обработка данных
            put!(c2, result)    # запись результата
        end
    end
    
    # можно запланировать к одновременному выполнению `n` экземпляров `foo`.
    for _ in 1:n
        errormonitor(@async foo())
    end
  • 通道是使用'Channel'构造函数创建的{T}(sz)'。 通道将只包含类型为`T’的对象。 如果未指定类型,则通道可以包含任何类型的对象。 'sz’是通道在任何给定时间可以包含的最大元素数。 例如,'Channel(32)'创建一个通道,该通道可以容纳不超过32个任何类型的对象。 '频道{MyType}(64)`在任何给定时间最多可容纳64个`MyType`对象。

  • 如果对象是 'Channel'为空,读操作(调用时 '拿!')被阻止,直到数据变得可用。

  • 如果对象是 'Channel'已满,记录操作(调用时 '放!')被阻塞,直到可用空间可用。

  • 方法 `isready'检查通道中是否存在至少一个对象,以及 `wait'等待对象变为可用。

  • 最初,对象 'Channel'处于打开状态。 这意味着数据可以自由地从中读取并使用调用写入。 '拿!'放!. 功能 `close'关闭对象 '通道'。 如果对象是 `频道'关闭,呼叫 '放!'会失败。 例如:

    julia> c = Channel(2);
    
    julia> put!(c, 1) # Вызов `put!` для открытого канала завершается успешно.
    1
    
    julia> close(c);
    
    朱莉娅>放!(c,2)#Call'put!'导致关闭通道的异常。
    错误:InvalidStateException:通道已关闭。
    [医]堆垛机:
    [...]
  • 方法 '拿!'和方法 `fetch'(接收一个值但不删除它)对于一个关闭的通道,任何可用的值都被成功返回,直到通道变为空。 让我们继续上面的例子。

    julia> fetch(c) # Любое количество вызовов`fetch` завершается успешно.
    1
    
    julia> fetch(c)
    1
    
    julia> take!(c) # При первом вызове `take!` значение удаляется. 1
    
    julia> take!(c) # В закрытом канале больше нет данных.
    ERROR: InvalidStateException: Channel is closed.
    Stacktrace:
    [...]

让我们来看一个使用通道在任务之间进行交互的简单示例。 让我们运行四个任务来处理来自单个"作业"通道的数据。 通道包含由’job_id’标识的作业。 此示例中的每个任务读取’job_id`,等待随机的时间量,并将由`job_id’和超时组成的元组写入结果通道。 最后,输出来自"结果"通道的所有数据。

julia> const jobs = Channel{Int}(32);

julia> const results = Channel{Tuple}(32);

julia> function do_work()
           for job_id in jobs
               exec_time = rand()
               sleep(exec_time)                # Имитирует время, затрачиваемое на реальную работу
                                               # обычно выполняемую где-то еще.
               put!(results, (job_id, exec_time))
           end
       end;

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> errormonitor(@async make_jobs(n)); # Подаем в канал jobs n заданий

julia> for i in 1:4 # запускает 4 задачи для параллельной обработки запросов
           errormonitor(@async do_work())
       end

julia> @elapsed while n > 0 # Выводим результаты
           job_id, exec_time = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds")
           global n = n - 1
       end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311

而不是’errormonitor(t)`,更可靠的解决方案可能是使用`bind(results,t)`函数,因为这不仅会注册所有意外失败,还会强制关闭所有关联的资源,并且会传递异常。

其他任务操作

任务操作基于低级原语 'yieldto'。 调用’yieldto(task,value)'会挂起当前任务,切换到`task`参数中指定的任务,并分配最后一次调用 'yieldto'对于此任务,返回’value’中指定的值。 注意了: 'yieldto'是唯一应该使用任务式执行顺序的操作;而不是调用和返回控制,我们只是切换到另一个任务。 这就是为什么任务也被称为"对称协程":切换到和从每个任务使用相同的机制发生。

'yieldto'是一个非常有效的函数,但在使用任务时很少直接调用它。 让我们来看看可能的原因。 如果您从当前任务切换,那么您可能需要在一段时间后切换回它。 但为了确定切换的正确时刻,以及负责这种切换的任务,需要严格协调动作。 例如,方法 '放!`'拿!'阻止其状态保存在通道使用上下文中的操作,以便可以识别消费者。 恰恰是没有必要跟踪任务-消费者手动制作方法 '放!'与低级函数相比,更容易使用 'yieldto'

此外 'yieldto',需要几个更基本的功能才能有效地使用任务。

任务和事件

任务切换通常是由于等待事件(例如I/O请求)而发生的,并且由Julia中基本模块中包含的调度程序执行。 调度程序维护一个可以启动的任务队列,并执行一个事件循环,该事件循环在外部事件发生时重新启动任务,例如消息。

等待事件的基本功能 — '等待'。 功能 'wait'实现了许多对象。 例如,对于’Process’对象,函数 'wait'正在等待它的执行完成。 功能 'wait'通常是隐含的;例如, `wait'可以在调用内部执行 `read'等待数据可用性。

在所有这些情况下,功能 `wait'最终与对象一起工作 'Condition',它负责排队和重新启动任务。 当任务调用函数时 `wait'当条件发生时 'Condition',则标记为未启动,加入条件队列并切换到调度器。 之后,调度程序选择要运行的另一个任务或阻止等待外部事件。 如果一切正常,事件处理程序最终会调用该函数 'notify',以便挂起的任务可以再次运行。

通过调用显式创建的任务 `Task',最初是调度程序未知的。 这允许您手动管理任务,如果需要,使用 'yieldto'。 但是,当这样的任务正在等待事件时,它仍然会在发生时自动重新启动,正如人们所期望的那样。