多线程
访问这个https://julialang.org/blog/2019/07/multithreading/[博客文章]介绍Julia多线程特性。
用多个线程启动Julia
默认情况下,Julia启动时有2个执行线程;1个工作线程和1个交互线程。 这可以通过使用命令进行验证 线程。线程():
julia> Threads.nthreads(:default)
1
julia> Threads.nthreads(:interactive)
1
执行线程的数量可以通过使用 -t/--线程 命令行参数或使用 JULIA_NUM_线程环境变量。 当两者都被指定时,则 -t/--线程 优先。
线程数可以指定为整数(--线程=4)或作为 汽车 (--线程=自动),在哪里 汽车 尝试推断要使用的有用的默认线程数(请参阅 命令行选项以获取更多详细信息)。
见 threadpools如何控制多少 :违约 和 :互动 线程在每个线程池中。
|
兼容性
朱莉娅1.5 |
|
兼容性
朱莉娅1.7使用 |
|
兼容性
Julia1.12默认以1个交互线程和1个工作线程开始,如果通过以下任一操作将线程数设置为1,则在Julia1.12中也是如此 |
让我们用4个线程开始Julia:
$ julia --threads 4
让我们验证有4个线程在我们的处置。
julia> Threads.nthreads()
4
但我们目前正在主线程上。 要检查,我们使用函数 线程。n.线,线
julia> Threads.threadid()
1
|
注意如果您更喜欢使用环境变量,您可以在Bash(Linux/macOS)中按如下方式设置它:
Linux/macOS上的c shell,WINDOWS上的CMD:
Windows上的Powershell:
请注意,必须这样做_before_开始Julia。 |
|
注意指定的线程数 |
多个GC线程
垃圾回收器(GC)可以使用多个线程。 默认情况下使用的数量与计算工作线程匹配,也可以由 --gcthreads 命令行参数或使用 JULIA_NUM_GC_THREADS环境变量。
|
兼容性
朱莉娅1.10 |
有关垃圾回收配置和性能调整的更多详细信息,请参阅 内存管理和垃圾回收。
线程池
当程序的线程忙于运行许多任务时,任务可能会遇到延迟,这可能会对程序的响应性和交互性产生负面影响。 要解决这个问题,您可以指定任务是交互式的,当您 线程。@产卵它:
using Base.Threads
@spawn :interactive f()
交互式任务应避免执行高延迟操作,如果它们是持续时间较长的任务,则应频繁产生。
默认情况下,Julia从一个保留用于运行交互任务的交互线程开始,但是这个数字可以通过:
$ julia --threads 3,1
julia> Threads.nthreads(:interactive)
1
$ julia --threads 3,0
julia> Threads.nthreads(:interactive)
0
环境变量 JULIA_NUM_线程也可以类似地使用:
export JULIA_NUM_THREADS=3,1
这从Julia的3个线程开始 :默认值 线程池和1个线程在 :互动 线程池:
julia> using Base.Threads
julia> nthreadpools()
2
julia> threadpool() # the main thread is in the interactive thread pool
:interactive
julia> nthreads(:default)
3
julia> nthreads(:interactive)
1
julia> nthreads()
3
|
注意通过执行明确要求1个线程 |
|
请注意的零参数版本 |
|
注意根据Julia是否已使用交互式线程启动主线程位于默认线程池或交互式线程池中。 |
任何一个或两个数字都可以用这个词代替 汽车,这导致Julia选择合理的默认值。
该 @线程 宏
让我们使用本机线程来做一个简单的例子。 让我们创建一个零数组:
julia> a = zeros(10)
10-element Vector{Float64}:
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
让我们使用4个线程同时对这个数组进行操作。 我们将让每个线程将其线程ID写入每个位置。
Julia支持使用 线程。@线程宏。 这个宏贴在一个 为 循环以向Julia指示循环是多线程区域:
julia> Threads.@threads for i = 1:10
a[i] = Threads.threadid()
end
迭代空间在各线程中分割,之后每个线程将其线程ID写入其分配的位置:
julia> a
10-element Vector{Float64}:
1.0
1.0
1.0
2.0
2.0
2.0
3.0
3.0
4.0
4.0
使用 @线程 没有数据竞赛
数据竞赛的概念在 "线程之间的通信和数据竞争"。 目前,我们只知道数据竞争可能导致错误的结果和危险的错误。
比方说,我们想使功能 [医]sum_single 下面是多线程。
julia> function sum_single(a)
s = 0
for i in a
s += i
end
s
end
sum_single (generic function with 1 method)
julia> sum_single(1:1_000_000)
500000500000
简单地添加 @线程 公开具有多个线程读取和写入的数据竞争 s 与此同时。
julia> function sum_multi_bad(a)
s = 0
Threads.@threads for i in a
s += i
end
s
end
sum_multi_bad (generic function with 1 method)
julia> sum_multi_bad(1:1_000_000)
70140554652
请注意,结果不是 500000500000 因为它应该是,并将最有可能改变每个评估。
为了解决这个问题,可以使用特定于任务的缓冲区将总和分割成无竞争的块。 这里 [医]sum_single 被重用,具有自己的内部缓冲区 s. 输入向量 a 最多分为 线程() 块进行并行工作。 然后我们使用 线程。@产卵 创建单独对每个块进行求和的任务。 最后,我们总结每个任务的结果 [医]sum_single 又来了:
julia> function sum_multi_good(a)
chunks = Iterators.partition(a, cld(length(a), Threads.nthreads()))
tasks = map(chunks) do chunk
Threads.@spawn sum_single(chunk)
end
chunk_sums = fetch.(tasks)
return sum_single(chunk_sums)
end
sum_multi_good (generic function with 1 method)
julia> sum_multi_good(1:1_000_000)
500000500000
|
注意缓冲区不应基于 |
另一种选择是对跨任务/线程共享的变量使用原子操作,根据操作的特性,这可能更具性能。
线程之间的通信和数据竞争
虽然Julia的线程可以通过共享内存进行通信,但编写正确且无数据竞争的多线程代码是出了名的困难。 朱莉娅的 频道s是线程安全的,可用于安全通信。 下面还有一些部分解释了如何使用 锁及 原子以避免数据竞争。
在某些情况下,Julia能够检测到安全违规行为,特别是在死锁或其他已知不安全的操作方面,例如屈服于当前正在运行的任务。 在这些情况下,一个 [医并发暴力]被抛出。
数据-竞争自由
您完全有责任确保您的程序是免费的数据竞争,如果您不遵守这一要求,这里没有任何承诺可以假设。 观察到的结果可能是非常不直观的。
如果引入数据竞争,Julia不是内存安全的。 *如果另一个线程可能写入_any_数据,请非常小心读取_any_数据,因为它可能导致分段错误或更糟*。 下面是两种从不同线程访问全局变量的不安全方法:
Thread 1:
global b = false
global a = rand()
global b = true
Thread 2:
while !b; end
bad_read1(a) # it is NOT safe to access `a` here!
Thread 3:
while !@isdefined(a); end
bad_read2(a) # it is NOT safe to access `a` here
使用锁避免数据竞争
避免数据竞争,从而编写线程安全代码的一个重要工具是"锁"的概念。 一个锁可以被锁定和解锁。 如果一个线程锁定了一个锁,而不是解锁它,它被称为"持有"锁。 如果只有一个锁,并且我们编写代码,要求持有锁来访问一些数据,我们可以确保多个线程永远不会同时访问相同的数据。 请注意,锁和变量之间的链接是由程序员而不是程序创建的。
例如,我们可以创建一个锁 我的锁,并在我们改变一个变量时锁定它 我的可变. 这是最简单的 @锁 宏:
julia> my_lock = ReentrantLock();
julia> my_variable = [1, 2, 3];
julia> @lock my_lock my_variable[1] = 100
100
通过使用具有相同锁和变量的类似模式,但在另一个线程上,操作不受数据竞争的影响。
我们可以使用以下功能版本执行上面的操作 锁,锁,通过以下两种方式:
julia> lock(my_lock) do
my_variable[1] = 100
end
100
julia> begin
lock(my_lock)
try
my_variable[1] = 100
finally
unlock(my_lock)
end
end
100
所有三个选项都是等价的。 请注意最终版本如何需要明确的 试试-阻止以确保锁定始终解锁,而前两个版本在内部执行此操作。 当更改其他线程访问的数据(例如分配给全局或闭包变量)时,应该始终使用上面的锁模式。 不这样做可能会产生不可预见的严重后果。
原子操作
Julia支持访问和修改值_atomically_,即以线程安全的方式避免https://en.wikipedia.org/wiki/Race_condition[比赛条件]。 值(必须是基元类型)可以包装为 线程。原子以指示它必须以这种方式访问。 在这里我们可以看到一个例子:
julia> i = Threads.Atomic{Int}(0);
朱莉娅>ids=零(4);
朱莉娅>old_is=零(4);
朱莉娅>线程。@threads for id in1:4
old_is[id]=线程。atomic_add!(i,id)
ids[id]=id
结束
朱莉娅>old_is
4元素向量{Float64}:
0.0
1.0
7.0
3.0
朱莉娅>我[]
10
朱莉娅>ids
4元素向量{Float64}:
1.0
2.0
3.0
4.0
如果我们试图在没有原子标签的情况下进行加法,我们可能会因为竞争条件而得到错误的答案。 一个例子,如果我们不避免比赛会发生什么:
julia> using Base.Threads
julia> Threads.nthreads()
4
julia> acc = Ref(0)
Base.RefValue{Int64}(0)
julia> @threads for i in 1:1000
acc[] += 1
end
julia> acc[]
926
julia> acc = Atomic{Int64}(0)
Atomic{Int64}(0)
julia> @threads for i in 1:1000
atomic_add!(acc, 1)
end
julia> acc[]
1000
每场原子
我们也可以在更粒度的层面上使用原子,使用 @原子, @atomicswap, @atomicreplace宏,和 @atomiconce宏。
内存模型的具体细节和设计的其他细节写在https://gist.github.com/vtjnash/11b0031f2e2a66c9c24d33e810b34ec0[Julia Atomics宣言],稍后将正式出版。
结构声明中的任何字段都可以用 @原子,然后任何写入必须标有 @原子 此外,并且必须使用定义的原子顺序之一(:单调, :收购, :发布, :获取_release,或 :sequentially_consistent). 原子字段的任何读取也可以使用原子排序约束进行注释,或者如果未指定,则使用单调(放松)排序来完成。
|
兼容性
Julia1.7每场原子至少需要Julia1.7。 |
副作用和可变函数参数
当使用多线程时,我们必须小心使用不https://en.wikipedia.org/wiki/Pure_function[纯粹]因为我们可能会得到一个错误的答案。 例如,具有 名字结尾 !按照惯例修改他们的论点,因此不是纯粹的。
@threadcall
外部库,例如通过调用的库 ccall,给Julia的基于任务的I/O机制带来了问题。 如果C库执行阻塞操作,则阻止Julia调度程序执行任何其他任务,直到调用返回。 (例外是对自定义C代码的调用,这些代码回调到Julia,然后可能会产生,或者调用c代码 jl_yield(),C等价物 产量.)
该 @threadcall宏提供了一种方法来避免在这种情况下延迟执行。 它调度一个C函数在一个单独的线程中执行。 为此使用默认大小为4的threadpool。 Threadpool的大小通过环境变量控制 UV_THREADPOOL_SIZE. 在等待空闲线程时,以及在函数执行期间,一旦线程可用,请求任务(在主Julia事件循环上)会屈服于其他任务。 请注意 @threadcall 执行完成后才返回。 从用户的角度来看,它因此与其他Julia Api一样是一个阻塞调用。
非常重要的是,被调用的函数不会回调到Julia,因为它会segfault。
@threadcall 可能会在Julia的未来版本中删除/更改。
注意事项
此时,如果用户代码不存在数据竞争,则Julia运行时和标准库中的大多数操作都可以以线程安全的方式使用。 但是,在某些领域,稳定线程支持的工作正在进行中。 多线程编程有许多固有的困难,如果使用线程的程序表现出不寻常或不希望的行为(例如崩溃或神秘的结果),通常应该首先怀疑线程交互。
在Julia中使用线程时,需要注意一些特定的限制和警告:
*基本集合类型需要手动锁定,如果多个线程同时使用,其中至少有一个线程修改集合(常见示例包括 推! 在数组上,或将项插入到 Dict,Dict).
*使用的时间表 @产卵是不确定的,不应该依赖。
*计算绑定的非内存分配任务可以阻止垃圾回收在分配内存的其他线程中运行。 在这些情况下,可能需要插入手动调用 GC。安全点() 以允许GC运行。 这一限制将在未来被删除.
*避免运行顶级操作,例如 包括,或 埃瓦尔 并行定义类型、方法和模块。
*请注意,如果启用了线程,库注册的终结器可能会中断。 这可能需要在整个生态系统中进行一些过渡性的工作,然后线程才能被广泛采用。 请参阅 终结者的安全使用了解更多详情。
安全使用终结器
因为终结器可以中断任何代码,所以它们必须非常小心地与任何全局状态交互。 不幸的是,使用终结器的主要原因是更新全局状态(纯函数作为终结器通常毫无意义)。 这给我们带来了一个难题。 有几个方法来处理这个问题:
-
当单线程时,代码可以调用内部
jl_gc_enable_finalizersC函数防止终结器在关键区域内被调度. 在内部,这在一些函数(例如我们的C锁)内部使用,以防止在执行某些操作(增量包加载,codegen等)时递归。). 锁和此标志的组合可用于使终结器安全。 -
Base在几个地方使用的第二种策略是显式延迟终结器,直到它可以非递归地获取其锁。 下面的示例演示如何将此策略应用于
分布。finalize_ref:
function finalize_ref(r::AbstractRemoteRef)
if r.where > 0 # Check if the finalizer is already run
if islocked(client_refs) || !trylock(client_refs)
# delay finalizer for later if we aren't free to acquire the lock
finalizer(finalize_ref, r)
return nothing
end
try # `lock` should always be followed by `try`
if r.where > 0 # Must check again here
# Do actual cleanup here
r.where = 0
end
finally
unlock(client_refs)
end
end
nothing
end
-
相关的第三种策略是使用无良率队列。 我们目前没有在Base中实现无锁队列,但是
基地。IntrusiveLinkedListSynchronized的{T}是合适的。 对于具有事件循环的代码,这通常是一个很好的策略。 例如,这种策略是由Gtk。jl管理生命周期重新计数。 在这种方法中,我们不会在终结器,终结器,而是将其添加到队列中,以便在更安全的时间运行。 事实上,Julia的任务调度程序已经使用了这个,所以将终结器定义为x->@产卵do_cleanup(x)就是这种方法的一个例子。 但是请注意,这不能控制哪个线程do_cleanup继续,所以do_cleanup还需要一把锁。 如果您实现自己的队列,则不需要这样做,因为您可以显式地只从线程中耗尽该队列。