Engee 文档

多线程

本文介绍了Julia的多线程特性https://julialang.org/blog/2019/07/multithreading /[博客文章]。

使用多个线程运行Julia

默认情况下,Julia使用单个执行线程运行。 您可以使用命令检查此 '线程。nthreads()`

julia> Threads.nthreads()
1

控制线程的数量由命令行参数`-t`/--threads`或环境变量控制 'JULIA_NUM_THREADS'。 当两者都被指定时,-t`/`--threads’将具有优先级。

线程数可以设置为整数(--threads=4),也可以设置为’auto'('--threads=auto'`,其中’auto’尝试输出有用的默认线程数来使用(有关更多信息,请参阅页面 命令行选项)。

兼容性:Julia1.5

命令行参数'-t`/`--threads’要求版本至少为Julia1.5。 在旧版本中,必须使用环境变量。

兼容性:Julia1.7

这样作为环境变量的值 'JULIA_NUM_THREADS'使用’auto',要求版本不低于Julia1.7。 在旧版本中,此值将被忽略。 让我们用四个线程启动Julia。

$ julia --threads 4

让我们检查一下有四条小溪可供我们使用。

julia> Threads.nthreads()
4

但是,我们目前处于主流。 要检查这一点,请使用函数 '线程。threadid'

julia> Threads.threadid()
1

如果您更喜欢使用环境变量,则可以在Bash(Linux/macOS)中进行如下设置:

    export JULIA_NUM_THREADS=4
В оболочке C на Linux/macOS или в CMD на Windows:
    set JULIA_NUM_THREADS=4
В Powershell на Windows:
    $env:JULIA_NUM_THREADS=4
Учтите, что это необходимо сделать *перед* запуском Julia.

由`-t`/--threads`指定的线程数适用于使用命令行选项-p`/--procs`或'--machine-file’创建的工作流。 例如,'julia-p2-t2’创建一个主进程和两个worker,并且为所有三个进程启用了两个线程。 为了更精确地控制工作流程,请使用 `addprocs'并传递'-t/`--threads’作为执行标志('exeflags')。

多个垃圾收集器线程

垃圾回收器可以使用多个线程。 使用的数量要么等于计算工作线程数的一半,要么使用命令行参数"--gcthreads"或环境变量设置。 'JULIA_NUM_GC_THREADS'

兼容性:Julia1.10

命令行参数'--gcthreads’需要不低于Julia1.10的版本。

溪流池

当程序线程忙于多个任务时,任务可能会延迟,这会对程序的响应性和交互性产生负面影响。 要解决此问题,您可以在计划时指定任务是交互式的('线程。@spawn')执行它:

using Base.Threads
@spawn :interactive f()

在交互式任务中,应避免高延迟操作,如果这些是长时间运行的任务,则应经常退出。

Julia可以使用为交互式任务保留的一个或多个线程运行。:

$ julia --threads 3,1

您可以以相同的方式使用环境变量。 'JULIA_NUM_THREADS':

export JULIA_NUM_THREADS=3,1

与此同时,Julia从线程池中的3个线程`:default`和线程池中的1个线程`:interactive`开始:

julia> using Base.Threads

julia> nthreadpools()
2

julia> threadpool() # главный поток находится в пуле интерактивных потоков
:interactive

julia> nthreads(:default)
3

julia> nthreads(:interactive)
1

julia> nthreads()
3

不带参数的`nthreads’版本默认返回池中的线程数。

根据Julia环境是使用交互式线程运行,主线程要么在默认线程池中,要么在交互式线程池中。

任何数字或两者都可以替换为单词’auto`,从而导致Julia选择合理的默认值。

`@Threads’宏

让我们举一个基于我们自己的流的简单例子。 创建一个零数组。

julia> a = zeros(10)
10-element Vector{Float64}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

我们将同时使用四个线程来处理这个数组。 每个线程将在每个位置记录其ID。

Julia支持基于宏的并行循环 '线程。@线程'。 此宏放在`for`循环之前,并向Julia指示该循环是一个多线程区域。

julia> Threads.@threads for i = 1:10
           a[i] = Threads.threadid()
       end

迭代区域分布在线程之间,然后每个线程将其ID写入分配的位置。

julia> a
10-element Vector{Float64}:
 1.0
 1.0
 1.0
 2.0
 2.0
 2.0
 3.0
 3.0
 4.0
 4.0

请注意 '线程。@threads'缺少可选的缩减参数,例如 '@distributed'

在没有数据竞争的情况下使用"@threads"

数据竞争的概念将在本节中详细讨论 线程之间的交互和数据竞争。 就目前而言,只要知道与数据竞争可能导致错误的结果和危险的错误。

假设我们想使`sum_single’函数低于多线程。

julia> function sum_single(a)
           s = 0
           for i in a
               s += i
           end
           s
       end
sum_single (generic function with 1 method)

julia> sum_single(1:1_000_000)
500000500000

简单地添加"@threads"会导致多个线程同时读取和写入"s"的数据竞争。

julia> function sum_multi_bad(a)
           s = 0
           Threads.@threads for i in a
               s += i
           end
           s
       end
sum_multi_bad (generic function with 1 method)

julia> sum_multi_bad(1:1_000_000)
70140554652

请注意,结果并不等于`500000500000`,因为它应该是,并且可能会随着每次计算而改变。

要解决此问题,您可以使用特定于任务的缓冲区将数量拆分为块,而无需竞争。 这里’sum_single`用它自己的内部缓冲区’s’重用。 输入向量’a’被拆分为片段’nthreads()'以进行并行操作。 然后我们使用’线程。@spawn’创建单独汇总每个片段的任务。 最后,我们总结每个任务的结果,再次使用’sum_single`:

julia> function sum_multi_good(a)
           chunks = Iterators.partition(a, length(a) ÷ Threads.nthreads())
           tasks = map(chunks) do chunk
               Threads.@spawn sum_single(chunk)
           end
           chunk_sums = fetch.(tasks)
           return sum_single(chunk_sums)
       end
sum_multi_good (generic function with 1 method)

julia> sum_multi_good(1:1_000_000)
500000500000

缓冲区不应该基于`threadid()来管理,即`buffers=zeros(Threads.nthreads()),由于可以输出同时的任务,即多个同时的任务可以在同一个线程中使用同一个缓冲区,这就产生了数据竞争的风险。 此外,如果有多个线程,任务可以在输出点更改流。 它被称为 任务的迁移

另一种选择是将原子操作与由任务或线程共享的变量一起使用,根据操作的特性,这可能更具生产力。

线程之间的交互和数据竞争

尽管Julia线程可以通过共享内存进行通信,但众所周知,编写不受数据竞争影响的正确多线程代码相当困难。 渠道(`Channel')是线程安全的,可用于安全通信。 下面的部分解释了如何使用 原子操作以避免数据竞争。

消除数据竞争

您全权负责确保您的程序中没有数据竞争。 如果您不遵守此要求,则此处的任何承诺都不保证。 观察到的结果可能是完全不可预测的。

如果有一场比赛,根据朱莉娅的说法,它并不能确保内存安全。 *如果可以在另一个流中写入任何数据,则在读取任何数据时要非常小心,因为这可能导致分段错误甚至更严重的问题。*以下是从不同线程访问全局变量的几种不安全方法。

Thread 1:
global b = false
global a = rand()
global b = true

Thread 2:
while !b; end
bad_read1(a) # Доступ к `a` здесь НЕбезопасен!

Thread 3:
while !@isdefined(a); end
bad_read2(a) # Доступ к `a` здесь НЕбезопасен.

使用锁排除数据竞争

锁定是避免数据竞争并因此编写线程安全代码的重要工具。 锁可以锁定和解锁。 如果一个线程阻塞了一个锁,并且没有解开它,那么它就被称为"持有"它。 如果只有一个锁,我们编写的代码需要持有锁来访问一些数据,我们可以确保多个线程永远不会在同一时间访问相同的数据。 请注意,锁和变量之间的关系是由程序员建立的,而不是程序。

例如,您可以创建一个`my_lock’锁并在更改`my_variable`变量时锁定它。 最简单的方法是使用宏'@lock`:

julia> my_lock = ReentrantLock();

julia> my_variable = [1, 2, 3];

julia> @lock my_lock my_variable[1] = 100
100

当使用具有相同锁和变量的类似模式,但在不同的线程中,操作将免于数据竞争。

上面用`lock`的功能版本描述的操作可以通过以下两种方式执行:

julia> lock(my_lock) do
           my_variable[1] = 100
       end
100

julia> begin
           lock(my_lock)
           try
               my_variable[1] = 100
           finally
               unlock(my_lock)
           end
       end
100

所有三个选项都是等价的。 请注意,最新版本需要明确的"try"块以确保锁定永久解锁,而前两个版本在内部执行此操作。 当更改其他线程可用的数据(例如,为全局作用域或闭包中的变量赋值)时,应始终使用上面显示的锁定模式。 不遵守这一要求可能导致不可预见的严重后果。

原子操作

Julia支持以原子方式访问和更改值,即具有线程安全和预防https://en.wikipedia.org/wiki/Race_condition [比赛条件]。 要提供对值的这种访问,您可以将其放在shell中。 '线程。原子'(并且值必须是原始类型)。 请参阅以下示例。

julia> i = Threads.Atomic{Int}(0);

julia> ids = zeros(4);

julia> old_is = zeros(4);

julia> Threads.@threads for id in 1:4
           old_is[id] = Threads.atomic_add!(i, id)
           ids[id] = id
       end

julia> old_is
4-element Vector{Float64}:
 0.0
 1.0
 7.0
 3.0

julia> i[]
 10

julia> ids
4-element Vector{Float64}:
 1.0
 2.0
 3.0
 4.0

如果我们试图在没有原子性标签的情况下进行加法,那么由于竞争条件,答案可能是不正确的。 没有种族例外会发生什么的一个例子:

julia> using Base.Threads

julia> Threads.nthreads()
4

julia> acc = Ref(0)
Base.RefValue{Int64}(0)

julia> @threads for i in 1:1000
          acc[] += 1
       end

julia> acc[]
926

julia> acc = Atomic{Int64}(0)
Atomic{Int64}(0)

julia> @threads for i in 1:1000
          atomic_add!(acc, 1)
       end

julia> acc[]
1000

每个字段中的原子操作

您还可以使用宏在更详细的级别上使用原子操作。 '@原子', '@atomicswap', '@atomicreplace''@atomiconce'

有关内存模型及其设计的其他方面的详细信息,请参阅https://gist …​github.com/vtjnash/11b0031f2e2a66c9c24d33e810b34ec0 [Julia Atomic Operations Manifesto],稍后将正式发布。

标签'@atomic’可以用于结构声明中的任何字段,然后每个记录也必须标记为'@atomic`并且必须使用原子操作的特定顺序之一(:monotonic:acquire:release:acquire_release`或:sequentially_consistent`)。 对于原子字段的任何读取,您还可以指定原子操作顺序的限制。 如果未指定,则使用非严格单调顺序。

兼容性:Julia1.7

每个字段中的原子操作都需要不低于Julia1.7的版本。

副作用和可变函数参数

使用多线程时,应小心使用不https://en.wikipedia.org/wiki/Pure_function [清洁],因为你可能会得到一个不正确的回应。 例如,具有 名字以`结尾!',默认情况下修改它们的参数,因此不是纯粹的。

@threadcall

外部库,例如,通过调用 'ccall',为Julia中使用的基于任务的I/O机制创建问题。 如果C库执行阻塞操作,则Julia调度程序在调用完成之前无法执行任何其他任务。 (例外情况是使用Julia回调调用自定义C代码,该回调可以放弃控制,或者调用`jl_yield()`函数的C代码,这是等价的 `产量'在C。)

'@threadcall'允许您避免在这种情况下暂停执行。 它在单独的线程中分配C函数的执行。 为此,默认情况下使用四个线程池。 线程池的大小由环境变量’UV_THREADPOOL_SIZE’控制。 使用此宏时,当请求任务(在主Julia事件循环中)正在等待空闲线程或在可用线程中执行函数时,它将控制权交给其他任务。 在这种情况下,`@threadcall’不会终止,直到执行完成。 因此,从用户的角度来看,这个宏是一个阻塞调用,就像其他Julia Api一样。

非常重要的是,被调用的函数不会进行Julia回调,因为这会导致程序崩溃。

在Julia的未来版本中,宏`@threadcall`可能会被删除或更改。

警告

目前,Julia运行时和标准库中的大多数操作都提供线程安全,前提是用户代码不包含数据竞争。 但是,线程支持在某些领域仍然不稳定。 多线程编程不可避免地涉及很多困难,因此如果使用线程的程序以不寻常或不希望的方式行为(例如,崩溃或产生意外结果),怀疑应该主要落在线程之间的

在Julia中使用流时,您需要考虑一些特定的限制和风险。

  • 当在多个线程中同时使用来自Base的集合类型时,如果至少有一个线程修改了集合(常见示例:push!'对于数组或将元素插入`Dict),必须使用手动锁定。

  • 执行任务的分配 '@spawn'是非确定性的,不应该被依赖。

  • 使用依赖于计算资源而不分配内存的任务可以防止垃圾回收在分配内存的其他线程上运行。 在这种情况下,可能需要手动插入对’GC的调用。safepoint`)'以确保它运行。 这种限制将在未来消除。

  • 尽量不要并行使用顶级操作,如`include`或`eval’来定义类型,方法和模块。

  • 请注意,启用线程可能会中断库注册的终结器的操作。 为了自由地为它们使用流,可能需要额外的生态系统改进。 有关详细信息,请参阅 终结者的安全使用

任务的迁移

任务在某个线程中开始运行后,如果它输出一个值,它可以移动到另一个线程。

这样的任务可以使用 @spawn''@threads',尽管@threads`的调度参数`:static’冻结了线程ID。

这意味着在大多数情况下 'threadid()`不应被视为任务中的常量,这意味着它不应用于将缓冲区或有状态对象索引到向量中。

兼容性:Julia1.7

任务的迁移出现在Julia1.7版本中。 在此之前,任务始终保持在启动它们的同一线程中。

安全使用终结器

由于终结器可以中断任何代码的执行,因此在与任何全局状态交互时必须特别小心。 不幸的是,对全局状态的更改是它们使用的主要原因(使用纯函数作为终结器通常没有意义)。 因此,出现了困难的情况。 有几种方法可以解决这个问题。

  1. 在单线程模式下,可以在代码中调用内部函数`jl_gc_enable_finalizers'。 在C语言中,使终结器的启动不被分配在任务关键区域。 这用于许多内部函数(例如,在我们的C锁中),以防止在执行某些操作(增量包加载,代码生成等)时递归。). 阻止和此标志的组合允许您使终结器安全。

  2. 在基本模块的某些元素中实现的另一种策略是显式延迟终结器的启动,直到它可以在没有递归的情况下获取锁。 下面的示例显示了这种策略在分布式应用中。finalize_ref`。

     function finalize_ref(r::AbstractRemoteRef)
     if r.where > 0 # Проверяет, не запущен ли уже финализатор if islocked(client_refs) || !trylock(client_refs) # Задержка запуска финализатора, если невозможно получить блокировку finalizer(finalize_ref, r) return nothing end try # За `lock` всегда должна следовать `try` if r.where > 0 # Здесь нужна повторная проверка # Здесь должна быть фактическая очистка r.where = 0 end finally unlock(client_refs) end end nothing end
3. A related third strategy is to use a yield-free queue. We don't currently
have a lock-free queue implemented in Base, but
`Base.IntrusiveLinkedListSynchronized{T}` is suitable. This can frequently be a
good strategy to use for code with event loops. For example, this strategy is
employed by `Gtk.jl` to manage lifetime ref-counting. In this approach, we
don't do any explicit work inside the `finalizer`, and instead add it to a queue
to run at a safer time. In fact, Julia's task scheduler already uses this, so
defining the finalizer as `x -> @spawn do_cleanup(x)` is one example of this
approach. Note however that this doesn't control which thread `do_cleanup`
runs on, so `do_cleanup` would still need to acquire a lock. That
doesn't need to be true if you implement your own queue, as you can explicitly
only drain that queue from your thread.