任务
# *`基地。@任务`*-马科罗_
@task
*例子*
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
# *`基地。@异步`*-马科罗_
@async
将表达式包装在 任务并将其添加到本地机器的调度程序队列中。
值可以插值成 @异步 经 $,它将值直接复制到构造的底层闭包中。 这允许您插入变量的_value_,将异步代码与当前任务中变量值的更改隔离开来。
|
警告强烈鼓励赞成 |
|
兼容性
Julia1.4通过内插值 |
# *`基地。异步映射`*-函数
asyncmap(f, c...; ntasks=0, batch_size=nothing)
使用多个并发任务进行映射 f 过一个集合(或多个等长集合)。 对于多个集合参数, f 按元素应用。
输出保证与集合元素的顺序相同 c.
ntasks的 指定要并发运行的任务数。 根据集合的长度,如果 ntasks的 未指定,最多100个任务将用于并发映射。
ntasks的 也可以指定为零arg函数。 在这种情况下,在处理每个元素之前检查要并行运行的任务数,如果值为 ntasks_func 大于当前任务数。
如果 批量大小 如果指定,则以批处理模式处理集合。 f 然后必须是一个必须接受 向量资料 参数元组,并且必须返回结果的向量。 输入向量的长度为 批量大小 或更少。
以下示例通过返回 对象,对象 其中执行映射函数的任务。
首先,与 ntasks的 未定义,每个元素在不同的任务中处理。
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Vector{UInt64}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
与 ntasks=2 所有元素都在2个任务中处理。
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Vector{UInt64}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
与 批量大小 定义,映射函数需要更改为接受参数元组数组并返回结果数组。 地图 被用在修改后的映射函数中来实现这一点。
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Vector{String}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
# *`基地。伊斯塔斯克多内`*-函数
istaskdone(t::Task) -> Bool
确定任务是否已退出。
*例子*
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
# *`基地。开始了`*-函数
istaskstarted(t::Task) -> Bool
确定任务是否已开始执行。
*例子*
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
# *`基地。istaskfailed`*-函数
istaskfailed(t::Task) -> Bool
确定任务是否因为引发异常而退出。
*例子*
julia> a4() = error("task failed");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
|
兼容性
Julia1.3此功能至少需要Julia1.3。 |
# *`基地。任务_local_storage`*-Method
task_local_storage(body, key, value)
调用函数 身体 与修改后的任务-本地存储,其中 价值 被分配到 钥匙;的前值 钥匙,或缺乏,事后恢复。 适用于模拟动态范围。
# *`核心。[医]并发暴力`*-类型
ConcurrencyViolationError(msg) <: Exception
发生可检测的并发语义冲突时引发的错误。
一个非详尽的例子列表,说明何时使用 include:
*检测到死锁时抛出(例如 等待(current_task()))
*已知不安全行为被尝试(例如 产量(current_task))
*尝试从多个并发任务中修改已知的非线程安全数据结构
*未被此任务锁定的锁正在解锁
时间表
# *`基地。产量`*-函数
yield(t::Task, arg = nothing)
一个快速,不公平的调度版本 附表(t,arg);收益率() 它立即屈服于 t 之前调用调度程序。
抛出一个 [医]并发暴力 如果 t 是当前正在运行的任务。
yield()
切换到计划程序以允许另一个计划任务运行。 调用此函数的任务仍然是可运行的,如果没有其他可运行的任务,将立即重新启动。
# *`基地。耶尔德托`*-函数
yieldto(t::Task, arg = nothing)
切换到给定的任务。 第一次切换到任务时,调用任务的函数时不带参数。 在随后的开关, arg,arg 从任务的最后一次调用返回到 耶尔德托. 这是一个低级调用,只切换任务,不以任何方式考虑状态或调度。 它的使用是不鼓励的。
# *`基地。时间表`*-函数
schedule(t::Task, [val]; error=false)
如果第二个参数 瓦尔 被提供,它将被传递给任务(通过 耶尔德托)再次运行时。 如果 错误 是 真的,该值在唤醒任务中作为异常引发。
|
警告使用不正确 |
*例子*
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
同步
# *`基地。勘误者`*-函数
errormonitor(t::Task)
打印错误日志到 斯德尔 如果任务 t 失败。
*例子*
julia> wait(errormonitor(Threads.@spawn error("task failed")); throw = false)
Unhandled Task ERROR: task failed
Stacktrace:
[...]
# *`基地。@同步`*-马科罗_
@sync
等到所有词汇封闭的使用 @异步, @产卵, 分布的。@产卵 和 分布。@分布式 都完成了。 所有由封闭的异步操作引发的异常都被收集并作为一个 CompositeException.
*例子*
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
# *`基地。等等!`*-函数
wait([x])
阻止当前任务,直到发生某些事件。
* 频道:等待一个值附加到通道。
* 条件:等待 通知条件和返回 瓦尔 传递给 通知. 查看 条件-特定的docstring 等等! 对于确切的行为。
* 过程:等待进程或进程链退出。 该 exitcode 过程的字段可用于确定成功或失败。
* 任务:等待一个 任务 完成。 查看 任务-特定的docstring 等等! 对于确切的行为。
* RawFD:等待文件描述符的更改(请参阅 文件处理 包)。
经常 等等! 在一个 而 循环以确保在继续之前满足等待条件。
# *`基地。等等`*-函数
waitany(tasks; throw=true) -> (done_tasks, remaining_tasks)
等待,直到至少一个给定的任务已经完成。
如果 投掷 是 真的,扔 CompositeException 当其中一个已完成的任务完成时出现异常。
返回值由两个任务向量组成。 第一个由已完成的任务组成,另一个由未完成的任务组成。
|
警告与编写使用多个单独任务的代码相比,这可能会很难扩展,每个任务都是串行运行的,因为这需要扫描 |
|
兼容性
Julia1.12这个函数至少需要Julia1.12。 |
# *`基地。等等`*-函数
waitall(tasks; failfast=true, throw=true) -> (done_tasks, remaining_tasks)
等待,直到所有给定的任务已经完成。
如果 失败,失败 是 真的,当至少一个给定的任务异常完成时,函数将返回。 如果 投掷 是 真的,扔 CompositeException 当其中一个已完成的任务失败时。
失败,失败 和 投掷 关键字参数独立工作;当只 投掷=真 被指定,此函数等待所有任务完成。
返回值由两个任务向量组成。 第一个由已完成的任务组成,另一个由未完成的任务组成。
|
兼容性
Julia1.12这个函数至少需要Julia1.12。 |
# *`基地。取货/取货`*-Method
fetch(t::Task)
等待一个 任务完成,然后返回其结果值。 如果任务失败并出现异常,则 TaskFailedException异常(其中包装失败的任务)被抛出。
# *`基地。时间,时间`*-函数
timedwait(testcb, timeout::Real; pollint::Real=0.1)
等到 测试中心() 申报表 真的 或 超时时间 秒过去了,以较早者为准。 测试功能被轮询每 波林特 几秒钟。 的最小值 波林特 为0.001秒,即1毫秒。
回来吧 :好的 或 :timed_out.
*例子*
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:ok
# *`基地。通知`*-函数
notify(condition, val=nothing; all=true, error=false)
唤醒等待条件的任务,通过它们 瓦尔. 如果 全部 是 真的 (默认),所有等待的任务都被唤醒,否则只有一个是。 如果 错误 是 真的,传递的值在唤醒任务中作为异常引发。
返回唤醒的任务计数。 如果没有任务正在等待,则返回0 条件.
# *`基地。信号量,信号量`*-类型
Semaphore(sem_size)
创建一个计数信号量,最多允许 半尺寸/半尺寸 获得在任何时候使用。 每个获取必须与一个发布匹配。
这提供了获取/释放调用时的获取和释放内存顺序。
# *`基地。收购;收购`*-函数
acquire(f, s::Semaphore)
执行 f 从信号量获取后 s,而 发行版 在完成或错误。
例如,一个do-block形式,确保只有2个调用 [医]脚 将在同一时间处于活动状态:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
|
兼容性
Julia1.8此方法至少需要Julia1.8。 |
acquire(s::Semaphore)
等待其中一个 半尺寸/半尺寸 许可证可用,阻止,直到一个可以获得。
# *`基地。锁,锁`*-函数
lock(f::Function, l::Lockable)
获取与 l,执行 f 保持锁,并在 f 回报。 f 将收到一个位置参数:由 l. 如果锁已经被不同的任务/线程锁定,请等待它变得可用。 当这个函数返回时, 锁,锁 已被释放,因此调用者不应尝试 解锁 它。
|
兼容性
Julia1.11至少需要Julia1.11。 |
lock(f::Function, lock)
获取 锁,锁,执行 f 与 锁,锁 持有,并释放 锁,锁 何时 f 回报。 如果锁已经被不同的任务/线程锁定,请等待它变得可用。
当这个函数返回时, 锁,锁 已被释放,因此调用者不应尝试 解锁 它。
请参阅: @锁.
|
兼容性
朱莉娅1.7使用a |
lock(lock)
获取 锁,锁 时变得可用。 如果锁已经被不同的任务/线程锁定,请等待它变得可用。
每个 锁,锁 必须与 解锁.
# *`基地。[医]岛`*-函数
islocked(lock) -> Status (Boolean)
检查是否 锁,锁 由任何任务/线程持有。 仅此功能不应用于同步。 然而, [医]岛 结合于 [医trylock]可用于编写test-and-test-and-set或指数退避算法_如果它由 `打字(锁)`_(阅读其文档)。
*扩展帮助*
例如,指数退避可以实现如下,如果 锁,锁 实现满足了下面记录的属性。
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
*实施*
建议定义锁定实现 [医]岛 具有以下属性,并在其docstring中记下它。
* islocked(锁定) 没有数据竞争。
*如果 islocked(锁定) 申报表 错误,立即调用 锁(锁) 必须成功(回报 真的)如果没有其他任务的干扰。
# *`基地。重入锁,重入锁`*-类型
ReentrantLock()
打电话来 锁,锁 也将禁止在该线程上运行终结器,直到相应的 解锁. 自然应该支持使用下面所示的标准锁模式,但要注意反转try/lock顺序或完全丢失try块(例如,试图在锁仍然保持的情况下返回):
这提供了锁定/解锁调用的获取/释放内存顺序。
lock(l)
try
<atomic work>
finally
unlock(l)
end
如果 !islocked(lck::ReentrantLock)持有, trylock(lck)成功,除非有其他任务试图保持锁"在同一时间。"
# *`基地。可上锁`*-类型
Lockable(value, lock = ReentrantLock())
|
兼容性
Julia1.11至少需要Julia1.11。 |
*例子*
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # must hold the lock to access the value
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"
渠道
# *`基地。频道`*-类型
Channel{T=Any}(size::Int=0)
频道(0) 构造无缓冲通道。 放! 块,直到匹配 拿! 被调用。 反之亦然。
其他构造函数:
* 频道():默认构造函数,等价于 频道{Any}(0)
* 频道(Inf):相当于 频道{Any}(typemax(Int))
* 频道(sz):相当于 频道{Any}(sz)
|
兼容性
Julia1.3默认构造函数 |
# *`基地。频道`*-Method
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
创建一个新的任务 [医]func, 绑定它到一个新的类型通道 T 和尺寸 大小,并安排任务,所有在一个单一的调用。 当任务终止时,通道会自动关闭。
[医]func 必须接受绑定通道作为其唯一参数。
如果您需要对创建的任务的引用,请传递 参考{Task} 对象通过关键字参数 任务ref.
如果 产卵=真,的 任务 为 [医]func 可以并行调度在另一个线程上,相当于通过 线程。@产卵.
如果 产卵=真 而 线程池 参数未设置,默认为 :默认值.
如果 线程池 参数设置(to :默认值 或 :互动),这意味着 产卵=真 并将新任务生成到指定的threadpool。
返回a 频道.
*例子*
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
引用创建的任务:
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
|
兼容性
朱莉娅1.3 |
|
兼容性
朱莉娅1.9 |
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end;
julia> String(collect(chnl))
"hello world"
# *`基地。伊斯富尔`*-Method
isfull(c::Channel)
确定是否 频道是满的,在这个意义上,调用 放!(c,some_value) 会被封锁的。 立即返回,不阻塞。
请注意,它可能经常是这样的情况 放! 此返回后不会阻塞 真的. 用户必须采取预防措施,不要通过调用此方法意外地在其代码中创建实时锁定错误,因为这些错误通常比死锁更难调试。 也有可能是 放! 此调用返回后将阻塞 错误,如果有多个生产者任务调用 放! 并行。
*例子*
缓冲通道:
julia> c = Channel(1); # capacity = 1
julia> isfull(c)
false
julia> put!(c, 1);
julia> isfull(c)
true
无缓冲通道:
julia> c = Channel(); # capacity = 0
julia> isfull(c) # unbuffered channel is always full
true
# *`基地。已经准备好了`*-Method
isready(c::Channel)
确定是否 频道具有存储在其中的值。 立即返回,不阻塞。
对于无缓冲通道,返回 真的 如果有任务在等待 放!.
*例子*
缓冲通道:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
无缓冲通道:
julia> c = Channel();
julia> isready(c) # no tasks waiting to put!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # schedule a put! task
julia> isready(c)
true
# *`基地。isopen的`*-Method
isopen(c::Channel)
*例子*
带任务的缓冲通道:
julia> c = Channel(ch -> put!(ch, 1), 1);
julia> isopen(c) # The channel is closed to new `put!`s
false
julia> isready(c) # The channel is closed but still contains elements
true
julia> take!(c)
1
茱莉亚>已准备好(c)
错误
无缓冲通道:
julia> c = Channel{Int}();
julia> isopen(c)
true
julia> close(c)
julia> isopen(c)
false
# *`基地。取货/取货`*-Method
fetch(c::Channel)
等待并返回(不删除)从 频道. 注: 取货/取货 在无缓冲(0大小)上不受支持 频道.
*例子*
缓冲通道:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # item is not removed
3-element Vector{Any}:
1
2
3
# *`基地。绑定`*-Method
bind(chnl::Channel, task::Task)
关联的生命周期 chnl 有任务。 频道 chnl 任务终止时自动关闭。 任务中任何未捕获的异常都会传播到 chnl.
该 chnl 可以独立于任务终止显式关闭对象。 终止任务对已关闭没有影响 频道 物体。
当一个通道绑定到多个任务时,第一个终止的任务将关闭该通道。 当多个通道绑定到同一个任务时,任务的终止将关闭所有绑定的通道。
*例子*
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia>绑定(c,任务);
朱莉娅>对于我在c
@展示我
结束;
i=1
i=2
i=3
i=4
朱莉娅>isopen(c)
错误
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
使用低级同步 时间表 和 等等!
最简单的正确使用 时间表是在一个 任务 尚未启动(计划)。 但是,可以使用 时间表和 等等!作为构建同步接口的非常低级的构建块。 打电话的一个关键先决条件 时间表(任务) 呼叫者必须"拥有" 任务 即,它必须知道,调用 等等! 在给定的 任务 发生在代码调用已知的位置 时间表(任务). 确保这种前置条件的一种策略是使用原子,如下面的示例所示:
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Spin until we successfully update the state to OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
# already waiting or about to call `wait`. The notifier task must wake up
# the waiter task.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Since we are assuming that there is only one notifier task (for
# simplicity), we know that the other possible case here is OWE_EMPTY.
# We do not need to do anything because we know that the waiter task has
# not called `wait(ev::OneWayEvent)` yet.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
# invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
# `wait()` immediately. In particular, it MUST NOT invoke any function that may
# yield to the scheduler at this point in code.
wait()
else
@assert state == OWE_NOTIFYING
# Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
# notifier task.
end
return
end
ev = OneWayEvent()
@sync begin
Threads.@spawn begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# output
notifying...
done
N.紧急情况 让一个任务 等等! 对于另一个任务的 通知. 它是一个有限的通信接口,因为 等等! 只能从单个任务中使用一次(注意 ev。任务)
在这个例子中, 通知(ev::OneWayEvent) 被允许调用 时间表(ev.任务) 当且仅当_it_修改状态从 OWE_WAITING 到 OWE_NOTIFYING. 这让我们知道正在执行的任务 等待(ev::OneWayEvent) 现在在 好的 分支,并且不能有其他任务试图 时间表(ev.任务) 由于他们的 @atomicreplace(ev。状态,状态=>OWE_NOTIFYING) 会失败。