任务
# '核心。任务'-Type
Task(func)
创建一个’Task`(即协程)来执行这个`func’函数(必须在没有参数的情况下调用)。 此函数返回时任务终止。 任务将在构造期间从父对象确定"世界年龄"方法的层次结构中执行。 '时间表'。
默认情况下,针对任务,固定位设置为true`t.sticky`。 这模仿了传统的默认行为 '@async'。 固定任务只能在最初计划它们的工作流中启动,并且在计划之后,计划它们的任务也将成为固定任务。 确保行为 '线程。@spawn`,手动将钉扎位设置为’false'。 |
例子
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
在此示例中,`b’表示尚未启动的随时可运行的任务。
# '基。@task'-Macro
@task
默认情况下,针对任务,固定位设置为true`t.sticky`。 这模仿了传统的默认行为 '@async'。 固定任务只能在最初计划它们的工作流中启动,并且在计划之后,计划它们的任务也将成为固定任务。 确保行为 '线程。@spawn`,手动将钉扎位设置为’false'。 |
例子
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
# '基。@async'-Macro
@async
将表达式包装在 'Task'并将其添加到本地计算机的调度程序队列中。
值可以使用`$插值到
@async`中,它将值直接复制到构造的基本闭包中。 这允许您插入变量的_value,将异步代码与当前任务中变量值的更改隔离开来。
强烈建议始终优先考虑"线程"。即使不需要并行*,@spawn`而不是`@async`*,特别是在公共库中。 这是因为使用'@async’禁止在当前Julia实现中的工作线程之间传输父任务。 因此,在库函数中看似无辜地使用`@async’可能会对用户应用程序各个部分的性能产生很大影响。 |
兼容性:Julia1.4
从Julia1.4开始,可以使用`$'插值值。 |
# '基。asyncmap'-Function
asyncmap(f, c...; ntasks=0, batch_size=nothing)
使用多个并行任务将`f’映射到一个集合(或相同长度的多个集合)。 对于集合的多个参数,`f’是逐个元素应用的。
'ntasks’表示要并行运行的任务数。 如果未指定"ntasks",则最多将使用100个任务进行并行匹配,具体取决于集合的长度。
'ntasks’也可以指定为不带参数的函数。 在这种情况下,在处理每个元素之前检查并行运行的任务数,并且如果`ntasks_func`的值大于当前任务数,则启动新任务。
如果指定了’batch_size',则以批处理模式处理集合。 在这种情况下,'f’应该是一个接受参数元组向量并返回结果向量的函数。 输入向量的长度必须等于’batch_size’的长度或更小。
以下示例演示如何通过返回在其中执行匹配函数的任务的`objectid`来执行各种任务。
首先,如果未设置`ntasks’值,则在不同的任务中处理每个项目。
julia> tskoid() = objectid(current_task()); julia> asyncmap(x->tskoid(), 1:5) 5-element Array{UInt64,1}: 0x6e15e66c75c75853 0x440f8819a1baa682 0x9fb3eeadd0c83985 0xebd3e35fe90d4050 0x29efc93edce2b961 julia> length(unique(asyncmap(x->tskoid(), 1:5))) 5
如果值为’ntasks=2',则在两个任务中处理所有项目。
julia> asyncmap(x->tskoid(), 1:5; ntasks=2) 5-element Array{UInt64,1}: 0x027ab1680df7ae94 0xa23d2f80cd7cf157 0x027ab1680df7ae94 0xa23d2f80cd7cf157 0x027ab1680df7ae94 julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2))) 2
如果设置了’batch_size’值,则必须修改匹配函数,使其接受参数元组数组并返回结果数组。 为此,请在修改后的映射函数中使用’map'。
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input) batch_func (generic function with 1 method) julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2) 5-element Array{String,1}: "args_tuple: (1,), element_val: 1, task: 9118321258196414413" "args_tuple: (2,), element_val: 2, task: 4904288162898683522" "args_tuple: (3,), element_val: 3, task: 9118321258196414413" "args_tuple: (4,), element_val: 4, task: 4904288162898683522" "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
# '基。asyncmap!`-Function</no-翻译>
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
同样地 'asyncmap',但结果保存在’results`中,并返回集合。
如果任何修改的参数被放置在与任何其他参数相同的内存区域中,则行为可能是意外的。 |
# '基。istaskdone'-Function
istaskdone(t::Task) -> Bool
确定任务是否已完成其工作。
例子
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
# '基。istaskstarted'-Function
istaskstarted(t::Task) -> Bool
确定任务是否已开始执行。
例子
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
# '基。istaskfailed'-Function
istaskfailed(t::Task) -> Bool
确定任务是否由于异常而完成。
例子
julia> a4() = error("task failed");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
兼容性:Julia1.3
此功能需要1.3或更高版本的Julia。 |
# '基。task_local_storage'-Method
task_local_storage(body, key, value)
调用具有更改存储的`body’函数,该函数是任务本地的,其中’value’值被分配给’key`;以前的’key’值或其缺失稍后恢复。 适用于模拟动态范围。
规划
# '基。yield'—Function
yield()
切换到调度程序以允许执行另一个计划任务。 调用此函数的任务仍然准备好执行,如果没有其他任务准备好执行,将立即重新启动。
yield(t::Task, arg = nothing)
调度不准确的`schedule(t,arg);yield()的快速版本,在调用调度程序之前立即输出`t
。
# '基。yieldto'-Function
yieldto(t::Task, arg = nothing)
切换到给定的任务。 第一次切换时,调用task函数时不带参数。 在随后的开关上,返回最后一个任务"yieldto"调用的"arg"。 这是一个低级调用,它只切换任务,而不考虑状态或调度。 不建议使用它。
# '基。附表'-Function
schedule(t::Task, [val]; error=false)
如果指定了第二个参数’val`,它将传递给任务(通过返回值 'yieldto')重新启动时。 如果’error’具有值’true',则在唤醒的任务中将该值作为异常调用。
对于已经启动的任意"任务"使用"计划"是不正确的。 有关详细信息,请参阅 API帮助。 |
默认情况下,针对任务,固定位设置为true`t.sticky`。 这模仿了传统的默认行为 '@async'。 固定任务只能在最初计划它们的工作流中启动,并且在计划之后,计划它们的任务也将成为固定任务。 确保行为 '线程。@spawn`,手动将钉扎位设置为’false'。 |
例子
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
同步
# '基。errormonitor'-Function
errormonitor(t::Task)
当任务’t’失败时,将错误日志输出到’stderr'。
例子
julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
Unhandled Task ERROR: task failed
Stacktrace:
[...]
# '基。@sync'-Macro
@sync
等待直到所有词汇确定的使用 '@异步', @spawn
,'分布式。@spawnat’和’分布式。@distributed’将被执行。 嵌套异步操作引发的所有异常都被收集并发出为 'CompositeException'。
例子
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
# '基。等待'-Function
特别注意事项 '线程。条件'。
呼叫者必须持有锁(lock'
),它拥有’线程。条件,在调用此方法之前。 调用任务将被阻塞,直到它被其他任务唤醒,通常是通过调用 `notify'对于相同的’Threads’对象。条件'。 如果执行了锁,它将被原子释放(即使它是递归执行的),并将在控件返回之前重新获取。
wait(r::Future)
等待指定的值可用 '未来`。
wait(r::RemoteChannel, args...)
等待一个值在指定的值上可用 'RemoteChannel`。
wait([x])
阻止当前任务,直到发生事件,具体取决于参数的类型。:
"等待"通常在"while"循环中被调用,以确保在继续之前满足预期条件。
wait(c::Channel)
阻止任务,直到’通道’准备就绪('已经准备好')。
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # задача блокируется, так как канал не готов
false
julia> put!(c, 1);
julia> istaskdone(task) # теперь задача разблокирована
true
# '基。获取'-Method</no-翻译>
fetch(t::Task)
等待任务完成 'Task',然后返回其结果的值。 如果任务失败并出现异常,则发出 'TaskFailedException'(封装失败的任务)。
# '基。timedwait'-Function
timedwait(testcb, timeout::Real; pollint::Real=0.1)
等待’testcb()将返回值`true
,或者当`timeout’秒数过去时,取决于之前发生的情况。 测试功能每`pollint`秒轮询一次。 Pollint的最小值为0.001s,即1毫秒。
返回`:ok’或':timed_out'。
例子
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:ok
# '基。事件'-Type</no-翻译>
Event([autoreset=false])
创建级别触发的事件源。 导致的任务 wait'
对于"事件",暂停和排队,直到调用 `通知为’事件'。 调用`notify`后,'Event’保持在alarm状态,tasks将不再阻塞,在调用`reset`时等待它。
如果’autoreset’设置为true,则每个`notify`调用的`wait`最多将释放一个任务。
这使得能够在接收和释放存储器以用于通知或等待时组织存储器。
兼容性:Julia1.1
此功能需要至少1.1的Julia版本。 |
兼容性:Julia1.8
'Autoreset’功能和内存排序保证要求Julia版本不低于1.8。 |
# '基。通知'-Function
notify(condition, val=nothing; all=true, error=false)
通过将它们传递给`val’来唤醒等待条件的任务。 如果’all’设置为’true'(默认情况下),则唤醒所有挂起的任务,否则只有一个是。 如果’error’具有值’true',则在唤醒的任务中将传递的值作为异常调用。
返回已唤醒任务的数量。 如果没有任务等待`condition',则返回0。
# '基。信号量'-Type
Semaphore(sem_size)
创建一个计数信号量,允许您随时使用接收的最大"sem_size"。 每张收据必须对应一项豁免。
这使得能够在接收和释放时布置存储器以用于接收或释放。
# '基。获取'-Function
acquire(s::Semaphore)
等待其中一个"sem_size"权限可用,锁定它直到可以获得。
acquire(f, s::Semaphore)
从信号量接收到`s`后执行`f`,并在完成或错误时释放(`release')。
例如,do-block表单,它保证只有两个对`foo`的调用,将在同一时间处于活动状态。:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
兼容性:Julia1.8
此方法需要至少1.8的Julia版本。 |
# '基。锁定'-Function
lock(lock)
当锁可用时获取锁。 如果锁已经被另一个任务或另一个线程阻塞,它会等待直到它变得可用。
每个锁必须对应一个解锁(`解锁')。
lock(f::Function, lock)
获取’lock’锁,在持有`lock`锁的情况下执行`f`,并在`f`的控制权返回时释放`lock`锁。 如果锁已经被另一个任务或另一个线程阻塞,它会等待直到它变得可用。
当这个函数返回时,锁被释放,所以调用者不应该尝试解锁它。
另请参阅说明 '@lock'。
兼容性:Julia1.7
供使用 'Channel'需要Julia至少1.7的版本作为第二个参数。 |
锁定(F::函数,l::可锁定)
获取与"l"相关联的锁,在持有锁的情况下执行"f",并在"f"的控制权返回时释放锁。 'f’将接收一个位置参数:包含在`l`中的值。 如果锁已经被另一个任务或另一个线程阻塞,它会等待,直到它变得可用。 当这个函数返回时,锁被释放,所以调用者不应该尝试解锁它。
兼容性:Julia1.11
要求Julia的版本不低于1.11。 |
# '基。trylock'-Function
trylock(lock) -> Success (Boolean)
如果锁可用,则获取锁,并在成功时返回值"true"。 如果锁已经被另一个任务或线程阻塞,则返回’false'。
每个成功的"trylock"锁必须对应一个解锁(`解锁')。
"Trylock"功能与 'islocked`可用于编写检查-检查-安装算法或指数延迟算法_if支持`typeof(lock)'_(查看相关文档)。
# '基。islocked`-Function
islocked(lock) -> Status (Boolean)
检查"锁"是否被任何任务或线程持有。 此函数本身不应用于同步。 但是,'islocked’功能与 'trylock'可用于编写检查-检查-安装算法或指数延迟算法_if支持`typeof(lock)'_(查看相关文档)。
高级帮助
例如,指数延迟可以如下实现,如果`锁`实现满足下面描述的属性。
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
实施
要实现锁,建议使用以下属性定义`islocked`,并在相应的docstring中指定这一点。
-
'Islocked(锁定)`功能不受"数据竞争"的影响。
-
如果’islocked(lock)
返回`false
,如果没有其他任务的干扰,立即调用`trylock(lock)`应该是成功的(返回`true')。
# '基。ReentrantLock'-Type
ReentrantLock()
调用’lock’也会阻止在相应的’unlock`之前执行此线程的终止方法。 使用下面所示的标准锁定模式自然应该得到支持,但要注意反转锁定尝试的顺序或完全跳过重试块(例如,在锁定仍在持有时尝试返回):
这使得能够在接收和释放用于阻塞或解除阻塞呼叫时组织存储器。
lock(l) try <atomic work> finally unlock(l) end
如果 `!islocked(lck::ReentrantLock)'正在举行, 'trylock(lck’成功,除非有其他任务试图在同一时间持有锁。
# '基。可锁定`-Type
可锁定(值,锁定=可重入锁())
兼容性:Julia1.11
要求Julia的版本不低于1.11。 |
例子
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # для доступа к значению должна удерживаться блокировка
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"
渠道
# '基。频道'-Type
Channel{T=Any}(size::Int=0)
'Channel(0)创建一个无缓冲通道。 "放!'执行锁定,直到相应的’采取!'被称为。
. 反之亦然。
其他构造函数:
-
'Channel(
':默认构造函数,相当于'+Channel{Any}(0)+
-
'Channel(Inf’:相当于'Channel{Any}(typemax(Int))`
-
'Channel(sz)':相当于'Channel{Any}(sz)`
兼容性:Julia1.3
默认构造函数是`Channel()`和`size=0`是在Julia1.3中默认添加的。 |
# '基。频道'-Method
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
创建一个基于"func"的新任务,将其绑定到一个大小为"size"的"T"类型的新通道,并安排任务;所有这些都是在一次调用中完成的。 当任务完成时,通道会自动关闭。
'func’应该将链接通道作为唯一的参数。
如果您需要指向已创建任务的链接,请传递对象'Ref{Task}`通过命名参数’taskref'。
如果’spawn=true`,则为`func’创建的’Task’可以并行调度到另一个线程,这相当于通过 '线程。@spawn'。
如果’spawn=true’并且没有设置’threadpool’参数,则默认为`:default'。
如果设置了’threadpool`参数(为`:default`或`:interactive`),则意味着`spawn=true’并在指定的线程池中创建新任务。
返回"通道"。
例子
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
创建指向已创建任务的链接
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
兼容性:Julia1.3
在Julia1.3中添加了`spawn='参数。 这个构造函数是在Julia1.3中添加的。 在早期版本中,Julia通道使用关键字参数来设置’size’和’T',但不推荐使用这些构造函数。 |
兼容性:Julia1.9
在Julia1.9中添加了`threadpool='参数。 |
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
# '基。isready'-Method
isready(c::Channel)
确定是否具有 'Channel'存储在其中的值。 立即返回控制权,锁不执行。
对于无缓冲通道,如果有任务等待,则返回值"true"。 '放!`.
例子
缓冲通道:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
无缓冲通道:
julia> c = Channel();
julia> isready(c) # нет задач, ожидающих put!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # планирование задачи put!
julia> isready(c)
true
# '基。获取'-Method</no-翻译>
fetch(c::Channel)
等待并返回(不删除)"通道"中的第一个可用元素。 注意。 在无缓冲(零大小)的"通道"中不支持"fetch"。
例子
缓冲通道:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # элемент не удаляется
3-element Vector{Any}:
1
2
3
# '基。绑定'-Method</no-翻译>
bind(chnl::Channel, task::Task)
将`chnl`的生存期与任务进行比较。 '通道’当任务完成时,'chnl`会自动关闭。 任务中任何未检测到的异常都会传播到`chnl`的所有挂起对象。
无论任务完成与否,都可以显式关闭’chnl`对象。 完成任务不会影响已经关闭的"通道"对象。
当通道链接到多个任务时,要终止的第一个任务将关闭通道。 当多个通道链接到同一任务时,终止该任务将关闭所有相关通道。
例子
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
朱莉娅>对于我在c
@展示我
结束;
i=1
i=2
i=3
i=4
朱莉娅>isopen(c)
错误
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
使用"计划"和"等待"进行低级同步
最简单的方法是正确使用它 'schedule`在尚未启动(计划)的`Task’中。 然而 "附表`及 'wait'可以用作构建同步接口的低级标准块。 调用`schedule(task)最重要的前提是调用者必须"拥有"`task
。 也就是说,她必须知道在这个`任务`中对`等待`的调用发生在调用`计划(任务)'的代码已知的地方。 提供这种先决条件的一种策略是使用原子操作,如下面的示例所示:
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Выполнять до тех пор, пока состояние не будет обновлено до OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# Переход OWE_WAITING -> OWE_NOTIFYING означает, что задача ожидания
# уже ждет или собирается вызвать `wait`. Задача уведомления должна пробудить
# задачу ожидания.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Поскольку предполагается, что существует только одна задача уведомления (для простоты),
# мы знаем, что другим возможным случаем является OWE_EMPTY.
# Нам не нужно ничего делать, потому что мы знаем, что задача ожидания еще не
# вызвала `wait(ev::OneWayEvent)`.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# Переход OWE_EMPTY -> OWE_WAITING означает, что задача уведомления гарантированно
# вызовет переход OWE_WAITING -> OWE_NOTIFYING. Задача ожидания должна
# немедленно вызвать`wait()`. В частности, она не должна вызывать никаких функций, которые
# могут выдавать данные в планировщик в этой точке кода.
wait()
else
@assert state == OWE_NOTIFYING
# В противном случае `state` уже должно быть переведено в OWE_NOTIFYING
# задачей уведомления.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# вывод
notifying...
done
'OneWayEvent’允许一个任务等待('wait`)来自另一个任务的通知(`notify')。 这是一个有限的通信接口,因为`wait’只能从单个任务中使用一次(注意’ev的非原子赋值。任务')。
在这个例子中,'notify(ev::OneWayEvent)可以调用`schedule(ev.task)'当且仅当_on_将状态从`OWE_WAITING’更改为’OWE_NOTIFYING'。 这样我们就知道执行`wait(ev::OneWayEvent)`的任务现在在`ok`分支中,并且不可能有其他任务试图执行`schedule(ev.任务)
,因为他们的宏是'@atomicreplace(ev。state,state=>OWE_NOTIFYING)`将失败。