Engee 文档

多线程

Threads.@threads [schedule] for ... end

在并行模式下执行’for’循环的宏。 迭代空间分布在大规模任务中。 可以使用`schedule`参数设置此策略。 循环的执行等待所有迭代的评估。

另请参阅说明 @spawn和’pmap’在 '分布式'

高级帮助

语义

除了由schedule设置参数定义更强保证的情况外,`@threads’宏执行的循环具有以下语义。

宏"@threads"以未指定的顺序执行循环主体,可能以多线程模式执行。 它不指定工作角色的确切分配分配和流程。 每次执行的分配可能不同。 循环体的代码(包括从其传递调用的所有代码)不应该对迭代在执行它们的任务或工作流中的分布做出假设。 每次迭代的循环主体应该能够独立于其余迭代逐步演化,并且应该消除数据竞争。 因此,不同迭代中的无效同步可能会导致死锁,而不同步的内存访问会话可能导致未定义的行为。

例如,上述条件假定如下:

  • 迭代中的锁定集必须在同一迭代中解除。

  • 使用`Channel’等基元的迭代之间的交互是不正确的。

  • 仅对不同迭代不共享的位置执行写操作(使用锁或原子操作时除外)。

  • 如果未使用schedule`:static',则值 'threadid()`即使在一次迭代中也可以改变。 请参阅部分 '任务迁移'

规划师

如果没有scheduler参数,则无法配置计划的确切创建,并且在Julia的不同版本中可能会有所不同。 目前,如果没有指定调度程序,则使用':dynamic'。

兼容性:Julia1.5

'Schedule’参数首先在Julia1.5中实现。

':dynamic'(默认)

调度程序`:dynamic’为可用的工作线程动态执行迭代。 当前实现假设每次迭代的工作负载是均匀的。 但是,这一假设可能在未来不再适用。

此调度参数只是基本执行机制的提示。 然而,许多属性是可以预期的。 调度程序':dynamic’使用的’任务’的数量被限制为等于可用工作线程数量的小常量值('线程。threadpoolsize())。 每个任务处理迭代空间的连续区域。 因此,'@threads:dynamic for x in xs;f(x);end`通常比@sync for x in xs;@spawn f(x);end’if`length(xs)`显着超过工作线程数,执行时间`f(x)`相对小于作业生成和同步的成本(通常小于10毫秒)更有效。

兼容性:Julia1.8

Schedule`参数的:dynamic’参数可用,自Julia1.8以来默认使用。

':贪婪`

计划者':贪婪’产生最多 '线程。threadpoolsize(’任务,每个任务处理尽可能多的可迭代值。 一旦任务完成,它就会从迭代器中获取下一个值。 任何单个任务不一定与相邻的迭代器值一起工作。 迭代器可以永远输出值;只需要迭代器接口(没有索引)。

如果单个迭代的工作量不均匀或分布较大,则此计划选项通常非常适合。

兼容性:Julia1.11

Value':贪婪的’schedule’参数首先在Julia1.11中实现。

`:静态'

':Static’调度程序为每个线程创建一个任务,并在它们之间平均分配迭代,将每个特定任务分别分配给每个线程。 特别是,价值 'threadid()`保证在一次迭代中是恒定的。 如果从另一个循环"@threads"或1以外的线程使用,则指定":static"是错误的。

:static’调度是为了支持移植在Julia1.3之前的版本中编写的代码而提供的。 在新的库函数中,不建议使用:static’调度,因为使用此参数的函数不能从任意工作线程调用。

例子

为了演示不同的计划策略,我们可以考虑下面的`busywait`函数,它包含一个给定持续时间的不成功循环,执行一定数量的秒。

julia> function busywait(seconds)
            tstart = time_ns()
            while (time_ns() - tstart) / 1e9 < seconds
            end
        end

julia> @time begin
            Threads.@spawn busywait(5)
            Threads.@threads :static for i in 1:Threads.threadpoolsize()
                busywait(1)
            end
        end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)

julia> @time begin
            Threads.@spawn busywait(5)
            Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
                busywait(1)
            end
        end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)

在示例`:dynamic`中,设置了2秒的周期,因为其中一个空闲线程可以执行两次1秒迭代以完成该周期。

Threads.foreach(f, channel::Channel;
                schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
                ntasks=Threads.threadpoolsize())

类似于’foreach(f,channel),但是`channel`上的迭代和对`f`的调用由`Threads生成的`ntasks`任务分隔。@产卵。 此函数将等待内部生成的任务完成后再返回控制权。

如果’schedule是FairSchedule`,那么’Threads。foreach’将尝试以允许Julia调度程序更自由地平衡不同线程中工作操作的负载的方式生成任务。 这种方法通常假设每个操作的开销更高,但在存在其他多线程工作负载时,与并行模式下的"StaticSchedule"相比,效率更高。

如果’schedule isa StaticSchedule`,那么’Threads。foreach以比FairSchedule更低的操作开销的方式生成作业,但不太适合负载平衡。 因此,这种方法可能更适合更精确,同质的工作负载,但与其他多线程工作负载并行模式下的`FairSchedule`相比,效率会更低。

例子

julia> n = 20

julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)

julia> d = Channel{Int}(n) do ch
           f = i -> put!(ch, i^2)
           Threads.foreach(f, c)
       end

julia> collect(d)
collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
兼容性:Julia1.6

此功能需要至少1.6的Julia版本。

Threads.@spawn [:default|:interactive] expr

创建 '任务'并适用于它 schedule'用于在指定线程池中的任何可用线程上执行(如果未指定,则为:default')。 任务在流变为可用后分配给它。 要等待任务完成,请致电 `wait'这个宏的结果,或者调用 `fetch'等待完成,然后获取返回值。

值可以使用`$插值到@spawn`中,它将值直接复制到构造的基本闭包中。 这允许您插入变量的_value,将异步代码与当前任务中变量值的更改隔离开来。

如果任务完成,运行任务的线程可能会发生变化,因此`threadid()`不应被视为任务的常量。 有关其他重要警告,请参阅 '任务迁移'并在更广泛 多线程指南。 另见关于 线程池

兼容性:Julia1.3

这个宏首先在Julia1.3中实现。

兼容性:Julia1.4

从Julia1.4开始,可以使用`$'插值值。

兼容性:Julia1.9

线程池可以从Julia1.9版本开始指定。

例子

julia> t() = println("Hello from ", Threads.threadid());

julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
Hello from 1
Hello from 1
Hello from 3
Hello from 4
Threads.threadid() -> Int

获取当前执行线程的标识号。 主流具有标识符`1'。

例子

julia> Threads.threadid()
1

julia> Threads.@threads for i in 1:4
          println(Threads.threadid())
       end
4
2
5
4

如果任务完成,运行任务的线程可能会发生变化,这被称为 '任务迁移'。 出于这个原因,在大多数情况下,使用`threadid()`来索引缓冲区对象或有状态对象的向量是不安全的。

Threads.maxthreadid() -> Int

获取Julia进程可用线程数(在所有线程池中)的下限,具有原子获取语义。 结果将始终大于或等于 'threadid()`,以及在调用`maxthreadid`之前可能观察到的任何任务的`threadid(task)'。

Threads.nthreads(:default | :interactive) -> Int

获取指定线程池中的当前线程数。 ':Interactive’中的流具有标识号'1:nthreads(:interactive):default`中的流在`nthreads(:interactive)范围内。+(1:nthreads(:default))`。

另见’BLAS的描述。get_num_threads’和’BLAS。标准库中的set_num_threads` 'LinearAlgebra'和’nprocs()` '分发''线程。maxthreadid(’

Threads.threadpool(tid = threadid()) -> Symbol

返回指定流的流池:':default',:interactive’或:foreign'。

Threads.nthreadpools() -> Int

返回当前配置的线程池的数量。

Threads.threadpoolsize(pool::Symbol = :default) -> Int

获取默认线程池(或指定线程池)可用的线程数。

另见’BLAS的描述。get_num_threads’和’BLAS。标准库中的set_num_threads` 'LinearAlgebra'和’nprocs()` '分布式'

Threads.ngcthreads() -> Int

返回当前配置的垃圾回收器线程数。 这包括标记流和平行清洗流两者。

另请参阅该部分 多线程

原子操作

不安全指针操作与加载和保存在C11和C中用类型`_Atomic`和`std::atomic`声明的指针兼容++分别为23. 如果不支持Julia`T`类型的原子加载,则可能会发生错误。

@atomic var
@atomic order ex

如果"ex"是受支持的表达式,则将"var"或"ex"标记为原子执行。 如果未指定’order’值,则默认使用sequentially_consistent。

@atomic a.b.x = new @atomic a.b.x += addend @atomic :release a.b.x = new @atomic :acquire_release a.b.x += addend

执行右侧原子表示的存储操作,并返回新值。

当使用'="时,此操作将转换为对`setproperty!(a.b,:x,new)'。 当使用任何运算符时,此操作将转换为对’modifyproperty!(a.b,:x,+,addend)[2]`。

@atomic a.b.x max arg2 @atomic a.b.x + arg2 @atomic max(a.b.x, arg2) @atomic :acquire_release max(a.b.x, arg2) @atomic :acquire_release a.b.x + arg2 @atomic :acquire_release a.b.x max arg2

执行在右侧原子表示的二进制操作。 将结果保存在第一个参数中的字段中,并返回值'(old,new)'。

此操作转换为对`modifyproperty!(a.b,:x,func,arg2)'。

有关详细信息,请参阅 每个领域的原子操作指南。

例子

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
1

julia> @atomic :sequentially_consistent a.x = 2 # задает поле х объекта а с последовательной согласованностью
2

julia> @atomic a.x += 1 # пошагово увеличивает поле х объекта а с последовательной согласованностью
3

julia> @atomic a.x + 1 # пошагово увеличивает поле х объекта а с последовательной согласованностью
3 => 4

julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
4

julia>@atomic max(a.x,10)#将对象a的字段x更改为一致性一致的最大值
4 => 10

julia>@atomic a.x max5#将对象a的字段x以一致的一致性再次更改为最大值
10 => 10
兼容性:Julia1.7

此功能需要Julia至少1.7的版本。

@atomicswap a.b.x = new
@atomicswap :sequentially_consistent a.b.x = new

将`new’保存到’a.b.x’并返回`a.b.x’的前一个值。

此操作转换为调用’swapproperty!(a.b,:x,new)'。

有关详细信息,请参阅 每个领域的原子操作指南。

例子

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomicswap a.x = 2+2 # заменяет поле х объекта а на значение 4 с последовательной согласованностью
1

julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
4
兼容性:Julia1.7

此功能需要Julia至少1.7的版本。

@atomicreplace a.b.x expected => desired
@atomicreplace :sequentially_consistent a.b.x expected => desired
@atomicreplace :sequentially_consistent :monotonic a.b.x expected => desired

执行由一对原子表示的条件替换,返回值'(old,success::Bool)'。 其中"成功"表示是否执行了替换。

此操作转换为对’replaceproperty!(a.b,:x,期望的,期望的)'。

有关详细信息,请参阅 每个领域的原子操作指南。

例子

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomicreplace a.x 1 => 2 # заменяет поле х объекта а на значение 2, если оно было равно 1, с последовательной согласованностью
(old = 1, success = true)

julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
2

julia> @atomicreplace a.x 1 => 2 # заменяет поле х объекта а на значение 2, если оно было равно 1, с последовательной согласованностью
(old = 2, success = false)

julia> xchg = 2 => 0; # заменяет поле х объекта а на значение 0, если оно было равно 2, с последовательной согласованностью

julia> @atomicreplace a.x xchg
(old = 2, success = true)

julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
0
兼容性:Julia1.7

此功能需要Julia至少1.7的版本。

@atomiconce a.b.x = value
@atomiconce :sequentially_consistent a.b.x = value
@atomiconce :sequentially_consistent :monotonic a.b.x = value

如果先前未设置值,则以原子方式执行值的条件赋值,返回值"success::Bool"。 "成功"表示分配是否已完成。

此操作转换为对`setpropertyonce的调用!(a.b,:x,value)'。

有关详细信息,请参阅 每个领域的原子操作指南。

例子

julia> mutable struct AtomicOnce
           @atomic x
           AtomicOnce() = new()
       end

julia> a = AtomicOnce()
AtomicOnce(#undef)

julia> @atomiconce a.x = 1 # присваивает полю х объекта а значение 1, если оно не было задано, с последовательной согласованностью
true

julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
1

julia> @atomiconce a.x = 1 # присваивает полю х объекта а значение 1, если оно не было задано, с последовательной согласованностью
false
兼容性:Julia1.11

此功能要求Julia的版本不低于1.11。

AtomicMemory{T} == GenericMemory{:atomic, T, Core.CPU}

向量资料 'DenseVector{T}定尺寸的。 对它的任何元素的访问都是以原子方式进行的(顺序为:单调`)。 任何元素都必须使用具有显式排序的宏"@atomic"来指定。

每个元素在访问时都是独立的原子,并且不能非原子地指定。 目前,宏"@atomic"和更高级别的接口还没有完全准备好,但未来实现的构建块是内部内置函数的核心。memoryrefget','核心。memoryrefset!','核心。memoryref_isassigned','核心。memoryrefswap!','核心。记忆,记忆!'和’核心。记忆之地!`.

有关详细信息,请参阅 原子操作

兼容性:Julia1.11

这种类型要求Julia版本不低于1.11。

对于一组不安全的函数,也有可选的内存排序选项,选择C/c兼容的。++ 如果为此参数指定了这些原子操作的版本 'unsafe_load', 'unsafe_store!, 'unsafe_swap!, 'unsafe_replace!'unsafe_modify!.

以下Api已被弃用,尽管它们可能在多个版本中得到支持。

Threads.Atomic{T}

它包含对`T’类型对象的引用,只允许原子访问,即以线程安全的方式。

只有某些"简单"类型可以原子地使用,特别是基元布尔类型,整数类型和浮点类型。 这些包括’Bool','Int8'。..'Int128','UInt8'。..`UInt128’和’Float16'。..'Float64'。

可以基于非原子值创建新的原子对象;如果未指定这些对象,则使用空值初始化原子对象。

原子对象可以使用符号`[]`访问:

例子

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> x[] = 1
1

julia> x[]
1

原子操作使用前缀’atomic_`,例如 'atomic_add!, 'atomic_xchg!等等。

Threads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T

原子地执行与`x’的交换的比较操作。

原子地将`x`中的值与`cmp’进行比较。 如果它们相等,'newval’写入’x'。 否则’x’保持不变。 返回`x’的前一个值。 通过将返回的值与`cmp`(使用`===`)进行比较,您可以找出`x`是否已更改,以及它现在是否包含新值`newval'。

有关详细信息,请参阅LLVM`cmpxchg`说明。

此函数可用于实现事务语义。 在交易之前,值被写入’x'。 交易完成后,只有在此期间未更改`x`时,才会保存新值。

例子

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 4, 2);

julia> x
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 3, 2);

julia> x
Base.Threads.Atomic{Int64}(2)
Threads.atomic_xchg!(x::Atomic{T}, newval::T) where T

原子地替换`x`中的值。

原子地将`x’中的值替换为’newval'。 返回*previous*值。

有关更多信息,请参阅LLVM手册`atomicrmw xchg'。

例子

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_xchg!(x, 2)
3

julia> x[]
2
Threads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

原子地将`val`添加到’x`

以原子的方式执行’x[]=val'。 返回*previous*值。 未定义为'+Atomic{Bool}`.

有关详细信息,请参阅LLVM说明"atomicrmw add"。

例子

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_add!(x, 2)
3

julia> x[]
5
Threads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

原子地从`x`中减去`val`

以原子的方式执行’x[]-=val'。 返回*previous*值。 未定义为'Atomic{Bool}`.

有关详细信息,请参阅LLVM`atomicrmw子`说明。

例子

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_sub!(x, 2)
3

julia> x[]
1
Threads.atomic_and!(x::Atomic{T}, val::T) where T

原子地执行具有`x`和`val`的位操作"和"。

以原子的方式执行’x[]&=val'。 返回*previous*值。

有关详细信息,请参阅LLVM`atomicrmw和`说明。

例子

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_and!(x, 2)
3

julia> x[]
2
Threads.atomic_nand!(x::Atomic{T}, val::T) where T

原子地执行位操作"而不是"与’x’和’val`

以原子方式执行`x[]=~(x[]&val)`。 返回*previous*值。

有关详细信息,请参阅LLVM`atomicrmw nand`说明。

例子

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_nand!(x, 2)
3

julia> x[]
-3
Threads.atomic_or!(x::Atomic{T}, val::T) where T

原子地执行按位操作"或"与’x’和’val'。

以原子方式执行’x[]/=val'。 返回*previous*值。

有关详细信息,请参阅LLVM`atomicrmw或`说明。

例子

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_or!(x, 7)
5

julia> x[]
7
Threads.atomic_xor!(x::Atomic{T}, val::T) where T

原子地执行用`x`和`val`排除"或"的按位操作

以原子的方式执行’x[]$=val'。 返回*previous*值。

有关详细信息,请参阅LLVM`atomicrmw xor`说明。

例子

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_xor!(x, 7)
5

朱莉娅>x[]
2
Threads.atomic_max!(x::Atomic{T}, val::T) where T

原子地将`x`和`val’的值的最大值存储在’x`中

以原子方式执行`x[]=max(x[],val)`。 返回*previous*值。

有关详细信息,请参阅LLVM`atomicrmw max`说明。

例子

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_max!(x, 7)
5

julia> x[]
7
Threads.atomic_min!(x::Atomic{T}, val::T) where T

原子地存储`x`和`val`在`x`中的值的最小值

以原子方式执行`x[]=min(x[],val)`。 返回*previous*值。

有关详细信息,请参阅LLVM`atomicrmw min`说明。

例子

julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)

julia> Threads.atomic_min!(x, 5)
7

julia> x[]
5
Threads.atomic_fence()

插入顺序一致的内存屏障。

插入具有一致的排序语义的内存屏障。 如有必要,算法是可用的,即在接收/释放的顺序不足的情况下。

这很可能是一个非常昂贵的操作。 鉴于Julia中的所有其他原子操作已经具有接收/释放语义,因此在大多数情况下不需要显式障碍。

有关详细信息,请参阅LLVM`fence`说明。

使用libuv线程池调用ccall(实验特性)

@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)

宏'@threadcall’的调用方式与 'ccall',但在不同的线程中执行其任务。 如果您想从调用阻塞函数而不阻塞当前的`julia`线程,建议使用它。 多线程模式受libuv线程池大小的限制,该线程池默认包括4个线程,但可以通过设置环境变量`UV_THREADPOOL_SIZE`并重新启动`julia`进程来增加此限制。

请注意,被调用的函数不应该对Julia进行回调。

低级同步原语

这些构建块用于创建常规同步对象。

SpinLock()

创建一个不可读的活动锁"检查-检查-安装"。 递归使用导致死锁。 这种类型的锁只能用于执行不需要太多时间且未被阻止的代码(例如,在I/O期间)。 通常,您可以使用 '重入锁'

每个 'lock'必须伴随 '解锁'。 如果 `!islocked(lck::SpinLock)'正在举行, 'trylock(lck’成功,除非有其他任务试图在同一时间持有锁。

当有大约30个竞争线程时,主动检查-检查-安装锁是最快的。 如果有更多的竞争线程,则应选择其他同步方法。