Engee 文档

任务

Task(func)

创建一个’Task`(即协程)来执行这个`func’函数(必须在没有参数的情况下调用)。 此函数返回时任务终止。 任务将在构造期间从父对象确定"世界年龄"方法的层次结构中执行。 '时间表'

默认情况下,针对任务,固定位设置为true`t.sticky`。 这模仿了传统的默认行为 '@async'。 固定任务只能在最初计划它们的工作流中启动,并且在计划之后,计划它们的任务也将成为固定任务。 确保行为 '线程。@spawn`,手动将钉扎位设置为’false'。

例子

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

在此示例中,`b’表示尚未启动的随时可运行的任务。

@task

将表达式包装在 `Task',而不执行它,并返回 '任务'。 此操作只创建一个任务,而不执行它。

默认情况下,针对任务,固定位设置为true`t.sticky`。 这模仿了传统的默认行为 '@async'。 固定任务只能在最初计划它们的工作流中启动,并且在计划之后,计划它们的任务也将成为固定任务。 确保行为 '线程。@spawn`,手动将钉扎位设置为’false'。

例子

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
@async

将表达式包装在 'Task'并将其添加到本地计算机的调度程序队列中。

值可以使用`$插值到@async`中,它将值直接复制到构造的基本闭包中。 这允许您插入变量的_value,将异步代码与当前任务中变量值的更改隔离开来。

强烈建议始终优先考虑"线程"。即使不需要并行*,@spawn`而不是`@async`*,特别是在公共库中。 这是因为使用'@async’禁止在当前Julia实现中的工作线程之间传输父任务。 因此,在库函数中看似无辜地使用`@async’可能会对用户应用程序各个部分的性能产生很大影响。

兼容性:Julia1.4

从Julia1.4开始,可以使用`$'插值值。

asyncmap(f, c...; ntasks=0, batch_size=nothing)

使用多个并行任务将`f’映射到一个集合(或相同长度的多个集合)。 对于集合的多个参数,`f’是逐个元素应用的。

'ntasks’表示要并行运行的任务数。 如果未指定"ntasks",则最多将使用100个任务进行并行匹配,具体取决于集合的长度。

'ntasks’也可以指定为不带参数的函数。 在这种情况下,在处理每个元素之前检查并行运行的任务数,并且如果`ntasks_func`的值大于当前任务数,则启动新任务。

如果指定了’batch_size',则以批处理模式处理集合。 在这种情况下,'f’应该是一个接受参数元组向量并返回结果向量的函数。 输入向量的长度必须等于’batch_size’的长度或更小。

以下示例演示如何通过返回在其中执行匹配函数的任务的`objectid`来执行各种任务。

首先,如果未设置`ntasks’值,则在不同的任务中处理每个项目。

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

如果值为’ntasks=2',则在两个任务中处理所有项目。

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

如果设置了’batch_size’值,则必须修改匹配函数,使其接受参数元组数组并返回结果数组。 为此,请在修改后的映射函数中使用’map'。

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

同样地 'asyncmap',但结果保存在’results`中,并返回集合。

如果任何修改的参数被放置在与任何其他参数相同的内存区域中,则行为可能是意外的。

current_task()

返回当前正在运行的任务 '任务'

istaskdone(t::Task) -> Bool

确定任务是否已完成其工作。

例子

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
istaskstarted(t::Task) -> Bool

确定任务是否已开始执行。

例子

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false
istaskfailed(t::Task) -> Bool

确定任务是否由于异常而完成。

例子

julia> a4() = error("task failed");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
兼容性:Julia1.3

此功能需要1.3或更高版本的Julia。

task_local_storage(key)

在任务的本地存储中搜索当前任务的键值。

task_local_storage(key, value)

为当前任务的任务本地存储中的键分配值。

task_local_storage(body, key, value)

调用具有更改存储的`body’函数,该函数是任务本地的,其中’value’值被分配给’key`;以前的’key’值或其缺失稍后恢复。 适用于模拟动态范围。

规划

yield()

切换到调度程序以允许执行另一个计划任务。 调用此函数的任务仍然准备好执行,如果没有其他任务准备好执行,将立即重新启动。


yield(t::Task, arg = nothing)

调度不准确的`schedule(t,arg);yield()的快速版本,在调用调度程序之前立即输出`t

yieldto(t::Task, arg = nothing)

切换到给定的任务。 第一次切换时,调用task函数时不带参数。 在随后的开关上,返回最后一个任务"yieldto"调用的"arg"。 这是一个低级调用,它只切换任务,而不考虑状态或调度。 不建议使用它。

sleep(seconds)

在指定的秒数内阻止当前任务。 最小等待时间为1毫秒(输入值'0.001'`。

schedule(t::Task, [val]; error=false)

添加任务 `Task'到调度程序队列。 这可确保任务在系统处于非活动状态时持续运行,仅当任务未执行阻塞操作时,例如 '等待'

如果指定了第二个参数’val`,它将传递给任务(通过返回值 'yieldto')重新启动时。 如果’error’具有值’true',则在唤醒的任务中将该值作为异常调用。

对于已经启动的任意"任务"使用"计划"是不正确的。 有关详细信息,请参阅 API帮助

默认情况下,针对任务,固定位设置为true`t.sticky`。 这模仿了传统的默认行为 '@async'。 固定任务只能在最初计划它们的工作流中启动,并且在计划之后,计划它们的任务也将成为固定任务。 确保行为 '线程。@spawn`,手动将钉扎位设置为’false'。

例子

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true

同步

errormonitor(t::Task)

当任务’t’失败时,将错误日志输出到’stderr'。

例子

julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
Unhandled Task ERROR: task failed
Stacktrace:
[...]
@sync

等待直到所有词汇确定的使用 '@异步', @spawn,'分布式。@spawnat’和’分布式。@distributed’将被执行。 嵌套异步操作引发的所有异常都被收集并发出为 'CompositeException'

例子

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2

特别注意事项 '线程。条件'

呼叫者必须持有锁(lock'),它拥有’线程。条件,在调用此方法之前。 调用任务将被阻塞,直到它被其他任务唤醒,通常是通过调用 `notify'对于相同的’Threads’对象。条件'。 如果执行了锁,它将被原子释放(即使它是递归执行的),并将在控件返回之前重新获取。


wait(r::Future)

等待指定的值可用 '未来`


wait(r::RemoteChannel, args...)

等待一个值在指定的值上可用 'RemoteChannel`


wait([x])

阻止当前任务,直到发生事件,具体取决于参数的类型。:

  • `Channel':等待一个值附加到通道。

  • '条件':等待 notify'条件并返回传递给’notify’的’val’参数。 等待条件另外允许您传递’first=true,这导致等待任务首先放在通知唤醒队列('notify')中,而不是通常的FIFO行为。

  • "进程":等待一个进程或一系列进程完成。 进程的"exitcode"字段可用于确定它是成功还是失败。

  • `Task':等待一个`Task’完成。 如果任务失败并出现异常,则发出’TaskFailedException'(它封装了失败的任务)。

  • `RawFD':等待文件描述符的更改(请参阅`FileWatching`包)。

如果没有传递参数,则任务将被无限期地阻止。 只能通过显式调用来重新启动任务 '附表''yieldto'

"等待"通常在"while"循环中被调用,以确保在继续之前满足预期条件。


wait(c::Channel)

阻止任务,直到’通道’准备就绪('已经准备好')。

julia> c = Channel(1);

julia> isready(c)
false

julia> task = Task(() -> wait(c));

julia> schedule(task);

julia> istaskdone(task)  # задача блокируется, так как канал не готов
false

julia> put!(c, 1);

julia> istaskdone(task)  # теперь задача разблокирована
true
fetch(t::Task)

等待任务完成 'Task',然后返回其结果的值。 如果任务失败并出现异常,则发出 'TaskFailedException'(封装失败的任务)。

fetch(x::Any)

返回’x'。

timedwait(testcb, timeout::Real; pollint::Real=0.1)

等待’testcb()将返回值`true,或者当`timeout’秒数过去时,取决于之前发生的情况。 测试功能每`pollint`秒轮询一次。 Pollint的最小值为0.001s,即1毫秒。

返回`:ok’或':timed_out'。

例子

julia> cb() = (sleep(5); return);

julia> t = @async cb();

julia> timedwait(()->istaskdone(t), 1)
:timed_out

julia> timedwait(()->istaskdone(t), 6.5)
:ok
Condition()

创建任务可以预期的前端事件源。 导致的任务 wait'对于’Condition',被挂起并放置在队列中。 当任务被唤醒时 `notify'后来被称为’Condition'。 如果使用可选参数,等待条件可能会返回值或导致错误。 '通知。 在前面运行意味着只有在调用期间等待的任务才能被唤醒。 '通知'。 对于级别触发的通知,必须维护一个额外的状态来跟踪是否发生了通知。 这是使用类型完成的 '通道''线程。事件',并且它们可用于级别触发的事件。

此对象不是线程安全的。 有关线程安全版本的信息,请参阅说明 '线程。条件'

Threads.Condition([lock])

线程安全版本 '基地。条件'

打电话 '等待'通知为’线程。条件`,必须首先调用它 '锁'。 当调用’wait’时,锁在锁定期间被原子释放,并将在`wait`返回之前被回收。 因此,"线程"的惯用语。条件''c’看起来像这样:

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
兼容性:Julia1.2

此功能需要至少1.2的Julia版本。

Event([autoreset=false])

创建级别触发的事件源。 导致的任务 wait'对于"事件",暂停和排队,直到调用 `通知为’事件'。 调用`notify`后,'Event’保持在alarm状态,tasks将不再阻塞,在调用`reset`时等待它。

如果’autoreset’设置为true,则每个`notify`调用的`wait`最多将释放一个任务。

这使得能够在接收和释放存储器以用于通知或等待时组织存储器。

兼容性:Julia1.1

此功能需要至少1.1的Julia版本。

兼容性:Julia1.8

'Autoreset’功能和内存排序保证要求Julia版本不低于1.8。

notify(condition, val=nothing; all=true, error=false)

通过将它们传递给`val’来唤醒等待条件的任务。 如果’all’设置为’true'(默认情况下),则唤醒所有挂起的任务,否则只有一个是。 如果’error’具有值’true',则在唤醒的任务中将传递的值作为异常调用。

返回已唤醒任务的数量。 如果没有任务等待`condition',则返回0。

reset(::Event)

重置事件 事件'回到未知状态。 然后,所有后续的"等待"调用将被阻止,直到重复调用。 '通知

Semaphore(sem_size)

创建一个计数信号量,允许您随时使用接收的最大"sem_size"。 每张收据必须对应一项豁免。

这使得能够在接收和释放时布置存储器以用于接收或释放。

acquire(s::Semaphore)

等待其中一个"sem_size"权限可用,锁定它直到可以获得。


acquire(f, s::Semaphore)

从信号量接收到`s`后执行`f`,并在完成或错误时释放(`release')。

例如,do-block表单,它保证只有两个对`foo`的调用,将在同一时间处于活动状态。:

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
兼容性:Julia1.8

此方法需要至少1.8的Julia版本。

release(s::Semaphore)

向池返回一个权限,如果可能,允许另一个任务接收它并恢复执行。

AbstractLock

描述实现同步原语的类型的抽象超类型: '锁', 'trylock`, '解锁''islocked`

lock(lock)

当锁可用时获取锁。 如果锁已经被另一个任务或另一个线程阻塞,它会等待直到它变得可用。

每个锁必须对应一个解锁(`解锁')。


lock(f::Function, lock)

获取’lock’锁,在持有`lock`锁的情况下执行`f`,并在`f`的控制权返回时释放`lock`锁。 如果锁已经被另一个任务或另一个线程阻塞,它会等待直到它变得可用。

当这个函数返回时,锁被释放,所以调用者不应该尝试解锁它。

另请参阅说明 '@lock'

兼容性:Julia1.7

供使用 'Channel'需要Julia至少1.7的版本作为第二个参数。


锁定(F::函数,l::可锁定)

获取与"l"相关联的锁,在持有锁的情况下执行"f",并在"f"的控制权返回时释放锁。 'f’将接收一个位置参数:包含在`l`中的值。 如果锁已经被另一个任务或另一个线程阻塞,它会等待,直到它变得可用。 当这个函数返回时,锁被释放,所以调用者不应该尝试解锁它。

兼容性:Julia1.11

要求Julia的版本不低于1.11。

unlock(lock)

释放`锁’的所有权。

如果是先前获得的递归锁,则减小内部计数器的值并立即返回控制。

trylock(lock) -> Success (Boolean)

如果锁可用,则获取锁,并在成功时返回值"true"。 如果锁已经被另一个任务或线程阻塞,则返回’false'。

每个成功的"trylock"锁必须对应一个解锁(`解锁')。

"Trylock"功能与 'islocked`可用于编写检查-检查-安装算法或指数延迟算法_if支持`typeof(lock)'_(查看相关文档)。

islocked(lock) -> Status (Boolean)

检查"锁"是否被任何任务或线程持有。 此函数本身不应用于同步。 但是,'islocked’功能与 'trylock'可用于编写检查-检查-安装算法或指数延迟算法_if支持`typeof(lock)'_(查看相关文档)。

高级帮助

例如,指数延迟可以如下实现,如果`锁`实现满足下面描述的属性。

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("timeout")
    end
    trylock(lock) && break
    backoff()
end

实施

要实现锁,建议使用以下属性定义`islocked`,并在相应的docstring中指定这一点。

  • 'Islocked(锁定)`功能不受"数据竞争"的影响。

  • 如果’islocked(lock)返回`false,如果没有其他任务的干扰,立即调用`trylock(lock)`应该是成功的(返回`true')。

ReentrantLock()

为任务同步创建传入锁 '任务'。 相同的任务可以根据需要多次被阻止。 每个 'lock'必须伴随 '解锁'

调用’lock’也会阻止在相应的’unlock`之前执行此线程的终止方法。 使用下面所示的标准锁定模式自然应该得到支持,但要注意反转锁定尝试的顺序或完全跳过重试块(例如,在锁定仍在持有时尝试返回):

这使得能够在接收和释放用于阻塞或解除阻塞呼叫时组织存储器。

lock(l)
try
    <atomic work>
finally
    unlock(l)
end

如果 `!islocked(lck::ReentrantLock)'正在举行, 'trylock(lck’成功,除非有其他任务试图在同一时间持有锁。

@lock l expr

Lock(f,l::AbstractLock)`作为宏的版本,但使用表达式`expr`而不是函数`f。 扩展到以下内容:

lock(l)
try
    expr
finally
    unlock(l)
end

这类似于使用 `lock'使用’do’块,但避免创建闭包,因此可以提高性能。

兼容性

宏"@lock"在Julia1.3中添加,并在Julia1.10中导出。

可锁定(值,锁定=可重入锁())

创建一个"可锁定"对象,该对象封装"值"并将其绑定到提供的"锁定"。 此对象支持 '@锁', '锁', 'trylock', '解锁'。 要访问该值,请在保持锁定的同时访问正在锁定的对象的索引。

兼容性:Julia1.11

要求Julia的版本不低于1.11。

例子

julia> locked_list = Base.Lockable(Int[]);

julia> @lock(locked_list, push!(locked_list[], 1)) # для доступа к значению должна удерживаться блокировка
1-element Vector{Int64}:
 1

julia> lock(summary, locked_list)
"1-element Vector{Int64}"

渠道

AbstractChannel{T}

传输类型为`T’的对象的信道的表示。

Channel{T=Any}(size::Int=0)

创建一个具有内部缓冲区的通道,该缓冲区可以包含类型为`T`的对象,最大数量可达`size'。 '放!'调用完整通道块,直到使用 '拿!`.

'Channel(0)创建一个无缓冲通道。 "放!'执行锁定,直到相应的’采取!'被称为。. 反之亦然。

其他构造函数:

  • 'Channel(':默认构造函数,相当于'+Channel{Any}(0)+

  • 'Channel(Inf’:相当于'Channel{Any}(typemax(Int))`

  • 'Channel(sz)':相当于'Channel{Any}(sz)`

兼容性:Julia1.3

默认构造函数是`Channel()`和`size=0`是在Julia1.3中默认添加的。

Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

创建一个基于"func"的新任务,将其绑定到一个大小为"size"的"T"类型的新通道,并安排任务;所有这些都是在一次调用中完成的。 当任务完成时,通道会自动关闭。

'func’应该将链接通道作为唯一的参数。

如果您需要指向已创建任务的链接,请传递对象'Ref{Task}`通过命名参数’taskref'。

如果’spawn=true`,则为`func’创建的’Task’可以并行调度到另一个线程,这相当于通过 '线程。@spawn'

如果’spawn=true’并且没有设置’threadpool’参数,则默认为`:default'。

如果设置了’threadpool`参数(为`:default`或`:interactive`),则意味着`spawn=true’并在指定的线程池中创建新任务。

返回"通道"。

例子

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

创建指向已创建任务的链接

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
兼容性:Julia1.3

在Julia1.3中添加了`spawn='参数。 这个构造函数是在Julia1.3中添加的。 在早期版本中,Julia通道使用关键字参数来设置’size’和’T',但不推荐使用这些构造函数。

兼容性:Julia1.9

在Julia1.9中添加了`threadpool='参数。

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(1) (2 items available)

julia> String(collect(chnl))
"hello world"
put!(c::Channel, v)

将’v’元素添加到’c’通道。 如果通道已满,则执行阻塞。

对于无缓冲通道,执行阻塞直到 '拿!`不会被另一个任务完成。

兼容性:Julia1.1

"V"现在转换为通道类型,使用 '转换',因为’放!'被称为。

take!(c::Channel)

删除并返回一个值 '通道'按顺序。 执行锁定,直到数据变为可用。 对于无缓冲通道,执行阻塞直到 '放!`不会被另一个任务完成。

例子

缓冲通道:

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

无缓冲通道:

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1
isready(c::Channel)

确定是否具有 'Channel'存储在其中的值。 立即返回控制权,锁不执行。

对于无缓冲通道,如果有任务等待,则返回值"true"。 '放!`.

例子

缓冲通道:

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

无缓冲通道:

julia> c = Channel();

julia> isready(c)  # нет задач, ожидающих put!
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # планирование задачи put!

julia> isready(c)
true
fetch(c::Channel)

等待并返回(不删除)"通道"中的第一个可用元素。 注意。 在无缓冲(零大小)的"通道"中不支持"fetch"。

例子

缓冲通道:

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # элемент не удаляется
3-element Vector{Any}:
 1
 2
 3
close(c::Channel[, excp::Exception])

关闭通道。 抛出异常(也可以通过`excp’给出):

bind(chnl::Channel, task::Task)

将`chnl`的生存期与任务进行比较。 '通道’当任务完成时,'chnl`会自动关闭。 任务中任何未检测到的异常都会传播到`chnl`的所有挂起对象。

无论任务完成与否,都可以显式关闭’chnl`对象。 完成任务不会影响已经关闭的"通道"对象。

当通道链接到多个任务时,要终止的第一个任务将关闭通道。 当多个通道链接到同一任务时,终止该任务将关闭所有相关通道。

例子

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

朱莉娅>对于我在c
           @展示我
       结束;
i=1
i=2
i=3
i=4

朱莉娅>isopen(c)
错误
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]

使用"计划"和"等待"进行低级同步

最简单的方法是正确使用它 'schedule`在尚未启动(计划)的`Task’中。 然而 "附表`'wait'可以用作构建同步接口的低级标准块。 调用`schedule(task)最重要的前提是调用者必须"拥有"`task。 也就是说,她必须知道在这个`任务`中对`等待`的调用发生在调用`计划(任务)'的代码已知的地方。 提供这种先决条件的一种策略是使用原子操作,如下面的示例所示:

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Выполнять до тех пор, пока состояние не будет обновлено до OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # Переход OWE_WAITING -> OWE_NOTIFYING означает, что задача ожидания
                # уже ждет или собирается вызвать `wait`. Задача уведомления должна пробудить
                # задачу ожидания.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Поскольку предполагается, что существует только одна задача уведомления (для простоты),
                # мы знаем, что другим возможным случаем является OWE_EMPTY.
                # Нам не нужно ничего делать, потому что мы знаем, что задача ожидания еще не
                # вызвала `wait(ev::OneWayEvent)`.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # Переход OWE_EMPTY -> OWE_WAITING означает, что задача уведомления гарантированно
        # вызовет переход OWE_WAITING -> OWE_NOTIFYING.  Задача ожидания должна
        # немедленно вызвать`wait()`.  В частности, она не должна вызывать никаких функций, которые
        # могут выдавать данные в планировщик в этой точке кода.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # В противном случае `state` уже должно быть переведено в OWE_NOTIFYING
        # задачей уведомления.
    end
    return
end

ev = OneWayEvent()
@sync begin
    @async begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# вывод
notifying...
done

'OneWayEvent’允许一个任务等待('wait`)来自另一个任务的通知(`notify')。 这是一个有限的通信接口,因为`wait’只能从单个任务中使用一次(注意’ev的非原子赋值。任务')。

在这个例子中,'notify(ev::OneWayEvent)可以调用`schedule(ev.task)'当且仅当_on_将状态从`OWE_WAITING’更改为’OWE_NOTIFYING'。 这样我们就知道执行`wait(ev::OneWayEvent)`的任务现在在`ok`分支中,并且不可能有其他任务试图执行`schedule(ev.任务),因为他们的宏是'@atomicreplace(ev。state,state=>OWE_NOTIFYING)`将失败。