AnyMath 文档

任务

Task(func[, reserved_stack::Int])

创建一个 任务 (即协程)来执行给定的函数 [医]func (必须是可调用的,没有参数)。 此函数返回时任务退出。 该任务将运行在"世界时代"从父在建设时 时间表d.

可选项 [医]叠片 参数指定此任务可用堆栈的大小,以字节为单位。 默认值, 0,使用依赖于系统的堆栈大小默认值。

默认情况下,警告任务将粘性位设置为true 粘粘的. 这为历史默认值建模 @异步. 粘性任务只能在它们首先被调度的工作线程上运行,并且当被调度时将使它们从粘性中被调度的任务。 获得的行为 线程。@产卵手动设置粘性位为 错误.

*例子*

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

在这个例子中, b 是可运行的 任务 还没开始呢。

@task

将表达式包装在 任务而不执行它,并返回 任务. 这只会创建一个任务,而不会运行它。

默认情况下,警告任务将粘性位设置为true 粘粘的. 这为历史默认值建模 @异步. 粘性任务只能在它们首先被调度的工作线程上运行,并且在调度时将使它们从粘性中被调度的任务。 获得的行为 线程。@产卵手动设置粘性位为 错误.

*例子*

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
@async

将表达式包装在 任务并将其添加到本地机器的调度程序队列中。

值可以插值成 @异步$,它将值直接复制到构造的底层闭包中。 这允许您插入变量的_value_,将异步代码与当前任务中变量值的更改隔离开来。

警告强烈鼓励赞成 线程。@产卵 结束 @异步 始终*即使不需要并行性*特别是在公共分布式库中。 这是因为使用 @异步 禁用Julia当前实现中跨工作线程迁移_parent_任务。 因此,看似无辜的使用 @异步 在库中,函数会对用户应用程序的不同部分的性能产生很大的影响。

兼容性

Julia1.4通过内插值 $ 从Julia1.4开始可用。

asyncmap(f, c...; ntasks=0, batch_size=nothing)

使用多个并发任务进行映射 f 过一个集合(或多个等长集合)。 对于多个集合参数, f 按元素应用。

输出保证与集合元素的顺序相同 c. ntasks的 指定要并发运行的任务数。 根据集合的长度,如果 ntasks的 未指定,最多100个任务将用于并发映射。 ntasks的 也可以指定为零arg函数。 在这种情况下,在处理每个元素之前检查要并行运行的任务数,如果值为 ntasks_func 大于当前任务数。

如果 批量大小 如果指定,则以批处理模式处理集合。 f 然后必须是一个必须接受 向量资料 参数元组,并且必须返回结果的向量。 输入向量的长度为 批量大小 或更少。

以下示例通过返回 对象,对象 其中执行映射函数的任务。

首先,与 ntasks的 未定义,每个元素在不同的任务中处理。

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Vector{UInt64}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

ntasks=2 所有元素都在2个任务中处理。

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Vector{UInt64}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

批量大小 定义,映射函数需要更改为接受参数元组数组并返回结果数组。 地图 被用在修改后的映射函数中来实现这一点。

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Vector{String}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

异步映射,但将输出存储在 结果 而不是返回集合。

当任何突变的参数与任何其他参数共享内存时,警告行为可能是意外的。

current_task()

获取当前正在运行的 任务.

istaskdone(t::Task) -> Bool

确定任务是否已退出。

*例子*

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
istaskstarted(t::Task) -> Bool

确定任务是否已开始执行。

*例子*

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false
istaskfailed(t::Task) -> Bool

确定任务是否因为引发异常而退出。

*例子*

julia> a4() = error("task failed");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
兼容性

Julia1.3此功能至少需要Julia1.3。

task_local_storage(key)

在当前任务的任务本地存储中查找键的值。

task_local_storage(key, value)

为当前任务的任务本地存储中的键分配值。

task_local_storage(body, key, value)

调用函数 身体 与修改后的任务-本地存储,其中 价值 被分配到 钥匙;的前值 钥匙,或缺乏,事后恢复。 适用于模拟动态范围。

ConcurrencyViolationError(msg) <: Exception

发生可检测的并发语义冲突时引发的错误。

一个非详尽的例子列表,说明何时使用 include:

*检测到死锁时抛出(例如 等待(current_task())) *已知不安全行为被尝试(例如 产量(current_task)) *尝试从多个并发任务中修改已知的非线程安全数据结构 *未被此任务锁定的锁正在解锁

时间表

yield(t::Task, arg = nothing)

一个快速,不公平的调度版本 附表(t,arg);收益率() 它立即屈服于 t 之前调用调度程序。

抛出一个 [医]并发暴力 如果 t 是当前正在运行的任务。

yield()

切换到计划程序以允许另一个计划任务运行。 调用此函数的任务仍然是可运行的,如果没有其他可运行的任务,将立即重新启动。

yieldto(t::Task, arg = nothing)

切换到给定的任务。 第一次切换到任务时,调用任务的函数时不带参数。 在随后的开关, arg,arg 从任务的最后一次调用返回到 耶尔德托. 这是一个低级调用,只切换任务,不以任何方式考虑状态或调度。 它的使用是不鼓励的。

sleep(seconds)

阻止当前任务指定的秒数。 最小睡眠时间为1毫秒或输入 0.001.

schedule(t::Task, [val]; error=false)

添加一个 任务到调度器的队列。 这会导致任务在系统空闲时不断运行,除非任务执行阻塞操作,例如 等等!.

如果第二个参数 瓦尔 被提供,它将被传递给任务(通过 耶尔德托)再次运行时。 如果 错误真的,该值在唤醒任务中作为异常引发。

警告使用不正确 时间表 在任意 任务 这已经开始。 见 API参考以获取更多信息。

默认情况下,警告任务将粘性位设置为true 粘粘的. 这为历史默认值建模 @异步. 粘性任务只能在它们首先被调度的工作线程上运行,并且当被调度时将使它们从粘性中被调度的任务。 获得的行为 线程。@产卵手动设置粘性位为 错误.

*例子*

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true

同步

errormonitor(t::Task)

打印错误日志到 斯德尔 如果任务 t 失败。

*例子*

julia> wait(errormonitor(Threads.@spawn error("task failed")); throw = false)
Unhandled Task ERROR: task failed
Stacktrace:
[...]
@sync

等到所有词汇封闭的使用 @异步, @产卵, 分布的。@产卵分布。@分布式 都完成了。 所有由封闭的异步操作引发的异常都被收集并作为一个 CompositeException.

*例子*

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2
wait([x])

阻止当前任务,直到发生某些事件。

* 频道:等待一个值附加到通道。 * 条件:等待 通知条件和返回 瓦尔 传递给 通知. 查看 条件-特定的docstring 等等! 对于确切的行为。 * 过程:等待进程或进程链退出。 该 exitcode 过程的字段可用于确定成功或失败。 * 任务:等待一个 任务 完成。 查看 任务-特定的docstring 等等! 对于确切的行为。 * RawFD:等待文件描述符的更改(请参阅 文件处理 包)。

如果没有传递参数,则任务将阻塞未定义的周期。 任务只能通过显式调用来重新启动 时间表耶尔德托.

经常 等等! 在一个 循环以确保在继续之前满足等待条件。

waitany(tasks; throw=true) -> (done_tasks, remaining_tasks)

等待,直到至少一个给定的任务已经完成。

如果 投掷真的,扔 CompositeException 当其中一个已完成的任务完成时出现异常。

返回值由两个任务向量组成。 第一个由已完成的任务组成,另一个由未完成的任务组成。

警告与编写使用多个单独任务的代码相比,这可能会很难扩展,每个任务都是串行运行的,因为这需要扫描 任务 每次并与每一个同步,每次这被调用。 或考虑使用 waitall(任务;failfast=true)代替。

兼容性

Julia1.12这个函数至少需要Julia1.12。

waitall(tasks; failfast=true, throw=true) -> (done_tasks, remaining_tasks)

等待,直到所有给定的任务已经完成。

如果 失败,失败真的,当至少一个给定的任务异常完成时,函数将返回。 如果 投掷真的,扔 CompositeException 当其中一个已完成的任务失败时。

失败,失败投掷 关键字参数独立工作;当只 投掷=真 被指定,此函数等待所有任务完成。

返回值由两个任务向量组成。 第一个由已完成的任务组成,另一个由未完成的任务组成。

兼容性

Julia1.12这个函数至少需要Julia1.12。

fetch(t::Task)

等待一个 任务完成,然后返回其结果值。 如果任务失败并出现异常,则 TaskFailedException异常(其中包装失败的任务)被抛出。

fetch(x::Any)

回来吧 x.

timedwait(testcb, timeout::Real; pollint::Real=0.1)

等到 测试中心() 申报表 真的超时时间 秒过去了,以较早者为准。 测试功能被轮询每 波林特 几秒钟。 的最小值 波林特 为0.001秒,即1毫秒。

回来吧 :好的:timed_out.

*例子*

julia> cb() = (sleep(5); return);

julia> t = @async cb();

julia> timedwait(()->istaskdone(t), 1)
:timed_out

julia> timedwait(()->istaskdone(t), 6.5)
:ok
Condition()

创建任务可以等待的边缘触发事件源。 调用的任务 等等!在一个 条件 被暂停和排队。 当任务被唤醒时 通知稍后在 条件. 如果可选参数为 通知被使用。 边缘触发意味着当时只有等待的任务 通知 被叫可以被唤醒。 对于级别触发的通知,您必须保留额外的状态以跟踪通知是否已发生。 该 xref:base/parallel.adoc#Base.Channel[频道线程。活动类型执行此操作,并且可用于级别触发的事件。

此对象不是线程安全的。 见 线程。条件为线程安全版本。

Threads.Condition([lock])

线程安全版本的 基地。条件.

打电话 等等!通知在一个 线程。条件,你必须先打电话 锁,锁就可以了。 何时 等等! 被调用时,锁在阻塞过程中被原子释放,并且在阻塞之前会被重新获取 等等! 回报。 因此惯用a 线程。条件 c 如下所示:

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
兼容性

Julia1.2此功能至少需要Julia1.2。

Event([autoreset=false])

创建级别触发的事件源。 调用的任务 等等!在一个 活动 被暂停和排队,直到 通知活动. 之后 通知 被称为, 活动 保持在信号状态,任务在等待时不再阻塞,直到 重置 被调用。

如果 自动装置 是真的,最多一个任务将被释放 等等! 对于每个调用 通知.

这提供了通知/等待时的获取和释放内存顺序。

兼容性

Julia1.1此功能至少需要Julia1.1。

兼容性

朱莉娅1.8 自动装置 功能和内存排序保证至少需要Julia1.8。

notify(condition, val=nothing; all=true, error=false)

唤醒等待条件的任务,通过它们 瓦尔. 如果 全部真的 (默认),所有等待的任务都被唤醒,否则只有一个是。 如果 错误真的,传递的值在唤醒任务中作为异常引发。

返回唤醒的任务计数。 如果没有任务正在等待,则返回0 条件.

reset(::Event)

重置一个 活动回到未设置状态。 然后任何未来的电话 等等! 将阻止,直到 通知被再次调用。

Semaphore(sem_size)

创建一个计数信号量,最多允许 半尺寸/半尺寸 获得在任何时候使用。 每个获取必须与一个发布匹配。

这提供了获取/释放调用时的获取和释放内存顺序。

acquire(f, s::Semaphore)

执行 f 从信号量获取后 s,而 发行版 在完成或错误。

例如,一个do-block形式,确保只有2个调用 [医]脚 将在同一时间处于活动状态:

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
兼容性

Julia1.8此方法至少需要Julia1.8。

acquire(s::Semaphore)

等待其中一个 半尺寸/半尺寸 许可证可用,阻止,直到一个可以获得。

release(s::Semaphore)

将一个许可证返回到池中,可能允许另一个任务获取它并恢复执行。

AbstractLock

描述实现同步原语的类型的抽象超类型: 锁,锁, [医trylock], 解锁,和 [医].

lock(f::Function, l::Lockable)

获取与 l,执行 f 保持锁,并在 f 回报。 f 将收到一个位置参数:由 l. 如果锁已经被不同的任务/线程锁定,请等待它变得可用。 当这个函数返回时, 锁,锁 已被释放,因此调用者不应尝试 解锁 它。

兼容性

Julia1.11至少需要Julia1.11。

lock(f::Function, lock)

获取 锁,锁,执行 f锁,锁 持有,并释放 锁,锁 何时 f 回报。 如果锁已经被不同的任务/线程锁定,请等待它变得可用。

当这个函数返回时, 锁,锁 已被释放,因此调用者不应尝试 解锁 它。

请参阅: @锁.

兼容性

朱莉娅1.7使用a 频道作为第二个参数需要Julia1.7或更高版本。

lock(lock)

获取 锁,锁 时变得可用。 如果锁已经被不同的任务/线程锁定,请等待它变得可用。

每个 锁,锁 必须与 解锁.

unlock(lock)

释放 锁,锁.

如果这是以前获取的递归锁,则递减一个内部计数器并立即返回。

trylock(lock) -> Success (Boolean)

获取锁,如果它是可用的,并返回 真的 如果成功。 如果锁已经被不同的任务/线程锁定,则返回 错误.

每个成功 [医]trylock 必须与 解锁.

功能 [医]trylock 结合于 [医]可用于编写test-and-test-and-set或指数退避算法_如果它由 `打字(锁)`_(阅读其文档)。

islocked(lock) -> Status (Boolean)

检查是否 锁,锁 由任何任务/线程持有。 仅此功能不应用于同步。 然而, [医]岛 结合于 [医trylock]可用于编写test-and-test-and-set或指数退避算法_如果它由 `打字(锁)`_(阅读其文档)。

*扩展帮助*

例如,指数退避可以实现如下,如果 锁,锁 实现满足了下面记录的属性。

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("timeout")
    end
    trylock(lock) && break
    backoff()
end

*实施*

建议定义锁定实现 [医]岛 具有以下属性,并在其docstring中记下它。

* islocked(锁定) 没有数据竞争。 *如果 islocked(锁定) 申报表 错误,立即调用 锁(锁) 必须成功(回报 真的)如果没有其他任务的干扰。

ReentrantLock()

打电话来 锁,锁 也将禁止在该线程上运行终结器,直到相应的 解锁. 自然应该支持使用下面所示的标准锁模式,但要注意反转try/lock顺序或完全丢失try块(例如,试图在锁仍然保持的情况下返回):

这提供了锁定/解锁调用的获取/释放内存顺序。

lock(l)
try
    <atomic work>
finally
    unlock(l)
end

如果 !islocked(lck::ReentrantLock)持有, trylock(lck)成功,除非有其他任务试图保持锁"在同一时间。"

@lock l expr

宏版本的 锁(f,l::AbstractLock) 但随着 expr 而不是 f 函数。 扩展到:

lock(l)
try
    expr
finally
    unlock(l)
end

这类似于使用 锁,锁用一个 块,但避免了创建闭包,从而可以提高性能。

兼容性

公司,公司 @锁 在Julia1.3中添加,并在Julia1.7中导出。

Lockable(value, lock = ReentrantLock())

创建一个 可上锁 包装的对象 价值 并将其与所提供的 锁,锁. 此对象支持 @锁, 锁,锁, [医trylock], 解锁. 要访问该值,请在保持锁定的同时索引可锁定对象。

兼容性

Julia1.11至少需要Julia1.11。

*例子*

julia> locked_list = Base.Lockable(Int[]);

julia> @lock(locked_list, push!(locked_list[], 1)) # must hold the lock to access the value
1-element Vector{Int64}:
 1

julia> lock(summary, locked_list)
"1-element Vector{Int64}"

渠道

AbstractChannel{T}

传递类型对象的通道的表示 T.

Channel{T=Any}(size::Int=0)

构造a 频道 与一个内部缓冲区,可以容纳最大的 大小 类型的对象 T. 放!调用一个完整的通道块,直到一个对象被删除 拿!.

频道(0) 构造无缓冲通道。 放! 块,直到匹配 拿! 被调用。 反之亦然。

其他构造函数:

* 频道():默认构造函数,等价于 频道{Any}(0) * 频道(Inf):相当于 频道{Any}(typemax(Int)) * 频道(sz):相当于 频道{Any}(sz)

兼容性

Julia1.3默认构造函数 频道() 和默认 尺寸=0 在Julia1.3中添加。

Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

创建一个新的任务 [医]func, 绑定它到一个新的类型通道 T 和尺寸 大小,并安排任务,所有在一个单一的调用。 当任务终止时,通道会自动关闭。

[医]func 必须接受绑定通道作为其唯一参数。

如果您需要对创建的任务的引用,请传递 参考{Task} 对象通过关键字参数 任务ref.

如果 产卵=真,的 任务[医]func 可以并行调度在另一个线程上,相当于通过 线程。@产卵.

如果 产卵=真线程池 参数未设置,默认为 :默认值.

如果 线程池 参数设置(to :默认值:互动),这意味着 产卵=真 并将新任务生成到指定的threadpool。

返回a 频道.

*例子*

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

引用创建的任务:

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
兼容性

朱莉娅1.3 产卵= 在Julia1.3中添加了参数。 这个构造函数是在Julia1.3中添加的。 在Julia的早期版本中,Channel使用关键字参数来设置 大小T,但这些构造函数被弃用。

兼容性

朱莉娅1.9 线程池= 在Julia1.9中添加了参数。

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end;

julia> String(collect(chnl))
"hello world"
put!(c::Channel, v)

追加项目 v 到通道 c. 如果信道已满,则阻塞。

对于无缓冲通道,块,直到一个 拿!由不同的任务执行。

兼容性

朱莉娅1.1 v 现在被转换为通道的类型与 转换/转换作为 放! 被调用。

take!(c::Channel)

从a中删除并返回一个值 频道按顺序。 块,直到数据可用。 对于无缓冲通道,块,直到一个 放!由不同的任务执行。

*例子*

缓冲通道:

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

无缓冲通道:

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1
isfull(c::Channel)

确定是否 频道是满的,在这个意义上,调用 放!(c,some_value) 会被封锁的。 立即返回,不阻塞。

请注意,它可能经常是这样的情况 放! 此返回后不会阻塞 真的. 用户必须采取预防措施,不要通过调用此方法意外地在其代码中创建实时锁定错误,因为这些错误通常比死锁更难调试。 也有可能是 放! 此调用返回后将阻塞 错误,如果有多个生产者任务调用 放! 并行。

*例子*

缓冲通道:

julia> c = Channel(1); # capacity = 1

julia> isfull(c)
false

julia> put!(c, 1);

julia> isfull(c)
true

无缓冲通道:

julia> c = Channel(); # capacity = 0

julia> isfull(c) # unbuffered channel is always full
true
isready(c::Channel)

确定是否 频道具有存储在其中的值。 立即返回,不阻塞。

对于无缓冲通道,返回 真的 如果有任务在等待 放!.

*例子*

缓冲通道:

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

无缓冲通道:

julia> c = Channel();

julia> isready(c)  # no tasks waiting to put!
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # schedule a put! task

julia> isready(c)
true
isopen(c::Channel)

确定是否 频道是开放的新 放!操作。 请注意, 频道`可以关闭,仍然有缓冲的元素,可以用[拿!](@ref)。

*例子*

带任务的缓冲通道:

julia> c = Channel(ch -> put!(ch, 1), 1);

julia> isopen(c) # The channel is closed to new `put!`s
false

julia> isready(c) # The channel is closed but still contains elements
true

julia> take!(c)
1

茱莉亚>已准备好(c)
错误

无缓冲通道:

julia> c = Channel{Int}();

julia> isopen(c)
true

julia> close(c)

julia> isopen(c)
false
fetch(c::Channel)

等待并返回(不删除)从 频道. 注: 取货/取货 在无缓冲(0大小)上不受支持 频道.

*例子*

缓冲通道:

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # item is not removed
3-element Vector{Any}:
 1
 2
 3
close(c::Channel[, excp::Exception])

关闭一个通道。 一个例外(可选地由 行政主任),被抛出:

* 放!在封闭的通道上。 * 拿!取货/取货在一个空的,封闭的通道上。

bind(chnl::Channel, task::Task)

关联的生命周期 chnl 有任务。 频道 chnl 任务终止时自动关闭。 任务中任何未捕获的异常都会传播到 chnl.

chnl 可以独立于任务终止显式关闭对象。 终止任务对已关闭没有影响 频道 物体。

当一个通道绑定到多个任务时,第一个终止的任务将关闭该通道。 当多个通道绑定到同一个任务时,任务的终止将关闭所有绑定的通道。

*例子*

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia>绑定(c,任务);

朱莉娅>对于我在c
           @展示我
       结束;
i=1
i=2
i=3
i=4

朱莉娅>isopen(c)
错误
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]

使用低级同步 时间表等等!

最简单的正确使用 时间表是在一个 任务 尚未启动(计划)。 但是,可以使用 时间表等等!作为构建同步接口的非常低级的构建块。 打电话的一个关键先决条件 时间表(任务) 呼叫者必须"拥有" 任务 即,它必须知道,调用 等等! 在给定的 任务 发生在代码调用已知的位置 时间表(任务). 确保这种前置条件的一种策略是使用原子,如下面的示例所示:

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Spin until we successfully update the state to OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
                # already waiting or about to call `wait`. The notifier task must wake up
                # the waiter task.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Since we are assuming that there is only one notifier task (for
                # simplicity), we know that the other possible case here is OWE_EMPTY.
                # We do not need to do anything because we know that the waiter task has
                # not called `wait(ev::OneWayEvent)` yet.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
        # invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
        # `wait()` immediately. In particular, it MUST NOT invoke any function that may
        # yield to the scheduler at this point in code.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
        # notifier task.
    end
    return
end

ev = OneWayEvent()
@sync begin
    Threads.@spawn begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# output
notifying...
done

N.紧急情况 让一个任务 等等! 对于另一个任务的 通知. 它是一个有限的通信接口,因为 等等! 只能从单个任务中使用一次(注意 ev。任务)

在这个例子中, 通知(ev::OneWayEvent) 被允许调用 时间表(ev.任务) 当且仅当_it_修改状态从 OWE_WAITINGOWE_NOTIFYING. 这让我们知道正在执行的任务 等待(ev::OneWayEvent) 现在在 好的 分支,并且不能有其他任务试图 时间表(ev.任务) 由于他们的 @atomicreplace(ev。状态,状态=>OWE_NOTIFYING) 会失败。