AnyMath 文档

多处理和分布式计算

一种分布式内存并行计算的实现由模块提供 分发的作为Julia附带的标准库的一部分。

大多数现代计算机拥有多个CPU,并且可以将多个计算机组合在一个群集中。 利用这些多个Cpu的强大功能,可以更快地完成许多计算。 影响性能的主要因素有两个:Cpu本身的速度,以及它们访问内存的速度。 在集群中,很明显,给定的CPU可以最快地访问同一台计算机(节点)内的RAM。 也许更令人惊讶的是,类似的问题在典型的多核笔记本电脑上也是相关的,因为主内存和https://www.akkadia.org/drepper/cpumemory.pdf因此,一个好的多处理环境应该允许特定CPU控制一个内存块的"所有权"。 Julia提供了一个基于消息传递的多处理环境,允许程序一次在不同内存域中的多个进程上运行。

Julia对消息传递的实现不同于MPI[1].Julia中的通信通常是"片面的",意味着程序员需要在两个进程的操作中只显式地管理一个进程。 此外,这些操作通常看起来不像"消息发送"和"消息接收",而是类似于对用户函数的调用等更高级别的操作。

Julia中的分布式编程建立在两个基元之上:remote references_和_remote calls。 远程引用是可用于从任何进程引用存储在特定进程上的对象的对象。 远程调用是一个进程请求在另一个(可能是相同的)进程上的某些参数上调用某个函数。

远程引用有两种风格: 未来远程通道.

另一方面, 远程通道s是可重写的。 例如,多个进程可以通过引用同一个远程来协调其处理 频道.

每个进程都有一个相关联的标识符。 提供交互式Julia提示的过程始终具有 身份证 等于1。 默认情况下用于并行操作的进程称为"worker"。 当只有一个进程时,进程1被认为是一个worker。 否则,工人被认为是进程1以外的所有进程。 因此,需要添加2个或更多的过程才能从并行处理方法中获得好处,例如 pmap. 如果您只是希望在worker上运行长计算时在主进程中执行其他操作,则添加单个进程是有益的。

让我们试试这个。 从 朱莉娅-p n 提供 n 本地计算机上的工作进程。 一般来说,这是有道理的 n 以等于机器上的CPU线程(逻辑内核)的数量。 请注意, -p 参数隐式加载模块 分发的.

$ julia -p 2

julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)

julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)

julia> fetch(s)
2×2 Matrix{Float64}:
 1.18526  1.50912
 1.16296  1.60607

第一个论点 远程呼叫是调用的函数。 Julia中的大多数并行编程都没有引用特定的进程或可用的进程数,但 远程呼叫被认为是提供更精细控制的低级接口。 第二个论点 远程呼叫身份证 完成工作的进程,其余的参数将传递给被调用的函数。

正如您所看到的,在第一行中,我们要求过程2构建一个2乘2的随机矩阵,在第二行中,我们要求它添加1。 这两个计算的结果可在两个期货, rs. 该 @产卵宏在第一个参数指定的进程上计算第二个参数中的表达式。

有时您可能需要立即远程计算值。 当您从远程对象读取以获取下一个本地操作所需的数据时,通常会发生这种情况。 的功能 remotecall_fetch为此而存在。 它相当于 fetch(remotecall(...)) 但效率更高。

julia> remotecall_fetch(r-> fetch(r)[1, 1], 2, r)
0.18526337335308085

这将获取worker2上的数组并返回第一个值。 请注意, 取货/取货 在这种情况下不会移动任何数据,因为它是在拥有数组的worker上执行的。 也可以写:

julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.10824216411304866

记住这一点 getindex(r,1,1)等同r[1,1],所以这个调用获取未来的第一个元素 r.

为了使事情更容易,符号 :任何 可以传递给 @产卵,它为你挑选在哪里做手术:

julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)

julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)

julia> fetch(s)
2×2 Matrix{Float64}:
 1.38854  1.9098
 1.20939  1.57158

请注意,我们使用了 1 .+取(r) 而不是 1 .+r. 这是因为我们不知道代码将在哪里运行,所以一般来说a 取货/取货可能需要搬家 r 到做加法的过程中。 在这种情况下, @产卵是否足够聪明,可以对拥有的进程执行计算 r,所以 取货/取货将是一个no-op(没有完成任何工作)。

(值得注意的是, @产卵不是内置的,而是在Julia中定义为 。 可以定义自己的此类构造。)

要记住的一件重要的事情是,一旦被取走,一个 未来将在本地缓存其值。 进一步 取货/取货呼叫不需要网络跳。 一旦所有引用 未来s已提取,远程存储的值被删除。

线程。@产卵类似于 @产卵,但仅在本地进程上运行任务。 我们用它来为每个过程创建一个"馈线"任务。 每个任务都会选择下一个需要计算的索引,然后等待其进程完成,然后重复执行,直到索引用完为止。 请注意,馈线任务不会开始执行,直到主要任务到达 @同步块,此时它放弃控制并等待所有本地任务完成,然后再从函数返回。 至于v0.7及以后,馈线任务能够通过 n.下一组 因为它们都运行在同一个进程上。 即使 任务 协同调度,在某些情况下可能仍然需要锁定,如 异步I/O。 这意味着上下文切换只发生在明确定义的点:在这种情况下,当 remotecall_fetch被调用。 这是当前的实现状态,它可能会在未来的Julia版本中改变,因为它的目的是使它可以运行到N 任务 在M上 过程,又名https://en.wikipedia.org/wiki/Thread_(计算)#Models[M:N Threading]. 然后一个锁获取\释放模型 n.下一组 这是需要的,因为让多个进程在同一时间读写一个资源是不安全的。

代码可用性和加载包

您的代码必须在运行它的任何进程上可用。 例如,在Julia提示符中键入以下内容:

julia> function rand2(dims...)
           return 2*rand(dims...)
       end

朱莉娅>rand2(2,2)
2×2矩阵{Float64}:
 0.153756  0.368514
 1.15119   0.918912

julia>fetch(@spawnat:any rand2(2,2))
错误:RemoteException(2,CapturedException(UndefVarError(Symbol("#rand2")))))
[医]堆垛机:
[...]

进程1知道函数 兰德2,但过程2没有。

大多数情况下,您将从文件或包加载代码,并且您在控制哪些进程加载代码方面具有相当大的灵活性。 考虑一个文件, DummyModule。jl,包含以下代码:

module DummyModule

export MyType, f

mutable struct MyType
    a::Int
end

f(x) = x^2+1

println("loaded")

end

为了参考 [医]类型 跨所有流程, DummyModule。jl 需要在每个进程上加载。 打电话来 包括("DummyModule.jl") 仅在单个进程上加载它。 要在每个进程上加载它,请使用 @无处不在宏(以Julia开头 朱莉娅-p2):

julia> @everywhere include("DummyModule.jl")
loaded
      From worker 3:    loaded
      From worker 2:    loaded

像往常一样,这不会带来 [医]假肢 进入任何过程的范围,这需要 使用进口. 而且,当 [医]假肢 被带入一个过程的范围,而不是在任何其他过程中:

julia> using .DummyModule

julia> MyType(7)
MyType(7)

julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: `MyType` not defined in `Main`
⋮

julia>fetch(@spawnat2DummyModule。MyType(7))
类型(7)

然而,它仍然是可能的,例如,发送一个 [医]类型 到已加载的进程 [医]假肢 即使不在范围内:

julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)

一个文件也可以在启动时预加载到多个进程上。 -L 标志,并且可以使用驱动程序脚本来驱动计算:

julia -p <n> -L file1.jl -L file2.jl driver.jl

在上面的示例中运行驱动程序脚本的Julia进程具有 身份证 等于1,就像提供交互提示的进程一样。

最后,如果 DummyModule。jl 不是一个独立的文件,而是一个包,那么 使用DummyModule 将_load_ DummyModule。jl 在所有的过程中,但只把它放在过程的范围内。 xref:base/base.adoc#using[使用 被召唤了。

启动和管理工作进程

基本Julia安装内置了对两种类型集群的支持:

*使用 -p 选项如上所示。 *使用 --机器-档案 选择。 这使用passwordless ssh 登录以在指定的机器上启动Julia工作进程(从与当前主机相同的路径)。 每个机器定义采用以下形式 [计数*][用户@]主机[:端口][bind_addr[:端口]]. 用户 默认为当前用户, 港口 到标准ssh端口。 计数 节点上要生成的worker数,默认为1。 可选项 bind-to bind_addr[:端口] 指定其他工作程序应用于连接到此工作程序的IP地址和端口。

注意虽然Julia通常致力于向后兼容性,但将代码分发到工作进程依赖于 序列化。序列化. 正如在相应的文档中指出的那样,这不能保证在不同的Julia版本上工作,因此建议所有机器上的所有worker都使用相同的版本。

功能 addprocs, rmprocs, 工人,以及其他可用作添加、删除和查询集群中的进程的编程方法。

julia> using Distributed

julia> addprocs(2)
2-element Vector{Int64}:
 2
 3

模块 分发的必须在调用主进程之前显式加载 addprocs. 它自动在工作进程上可用。

注意注意,工人不运行 ~/.朱莉娅/配置/启动。jl 启动脚本,也不会将其全局状态(如命令行开关、全局变量、新方法定义和加载的模块)与任何其他正在运行的进程同步。 你可以使用 addprocs(exeflags="--project") 要使用特定环境初始化worker,然后 @everywhere使用<modulename>@everywhere include("文件。jl").

其他类型的集群可以通过编写自己的自定义来支持 集群管理员,如下所述在 ClusterManagers部分。

数据移动

在分布式程序中,发送消息和移动数据构成了大部分开销。 减少消息数量和发送的数据量对于实现性能和可扩展性至关重要。 为此,了解Julia的各种分布式编程结构执行的数据移动非常重要。

取货/取货可以被认为是显式的数据移动操作,因为它直接要求将对象移动到本地机器。 @产卵(以及一些相关的构造)也移动数据,但这并不那么明显,因此它可以称为隐式数据移动操作。 考虑这两种方法来构造和平方随机矩阵:

方法1:

julia> A = rand(1000,1000);

julia> Bref = @spawnat :any A^2;

[...]

julia> fetch(Bref);

方法2:

julia> Bref = @spawnat :any rand(1000,1000)^2;

[...]

julia> fetch(Bref);

差异似乎微不足道,但实际上是相当显着的,由于行为 @产卵. 在第一种方法中,在本地构造一个随机矩阵,然后发送到另一个进程,在那里它被平方。 在第二种方法中,在另一个过程中构造和平方随机矩阵。 因此,第二种方法发送的数据比第一种方法少得多。

在这个玩具例子中,这两种方法很容易区分和选择。 然而,在一个真正的程序中,设计数据移动可能需要更多的思考和可能的一些测量。 例如,如果第一个过程需要矩阵 A 那么第一种方法可能会更好。 或者,如果计算 A 是昂贵的,只有当前进程有它,然后将其移动到另一个进程可能是不可避免的。 或者,如果当前过程在 @产卵取(Bref),完全消除并行性可能会更好。 或者想象一下 兰特(1000,1000) 被替换为更昂贵的操作。 那么添加另一个可能是有意义的 @产卵声明只是为了这一步。

全局变量

通过远程执行的表达式 @产卵,或为使用远程执行指定的闭包 远程呼叫可以指全局变量。 模块下的全局绑定 主要 与其他模块中的全局绑定相比,处理方式略有不同。 考虑以下代码片段:

A = rand(10,10)
remotecall_fetch(()->sum(A), 2)

在这种情况下 总和必须在远程进程中定义。 请注意 A 是在本地工作区中定义的全局变量。 Worker2没有一个名为 A主要. 运输关闭的行为 ()->总和(A) 工人2的结果 主要。A 被定义在2上。 主要。A 即使在调用后仍继续存在于worker2上 remotecall_fetch返回。 具有嵌入式全局引用的远程调用(下 主要 仅模块)管理全局变量,如下所示:

*如果作为远程调用的一部分引用目标工作程序,则会在目标工作程序上创建新的全局绑定。 *全局常量也在远程节点上声明为常量。 *全局变量仅在远程调用的上下文中被重新发送到目标worker,并且仅在其值已更改时才被重新发送。 此外,群集不会跨节点同步全局绑定。 例如:

+

A = rand(10,10)
remotecall_fetch(()->sum(A), 2) # worker 2
A = rand(10,10)
remotecall_fetch(()->sum(A), 3) # worker 3
A = nothing

+ 执行上述代码段会导致 主要。A 在工人2具有不同的值 主要。A 在工人3,而值 主要。A 在节点1上设置为 什么都没有.

正如您可能已经意识到的那样,虽然在主节点上重新分配全局变量时可能会收集与全局变量相关联的内存,但由于绑定继续有效,因此不会对工作人员 清楚!可用于手动将远程节点上的特定全局变量重新分配给 什么都没有 一旦他们不再需要。 这将释放与它们相关联的任何内存,作为常规垃圾回收周期的一部分。

因此,程序应该小心地在远程调用中引用全局变量。 事实上,如果可能的话,最好完全避免它们。 如果必须引用全局变量,请考虑使用 块来本地化全局变量。

例如:

julia> A = rand(10,10);

julia> remotecall_fetch(()->A, 2);

julia> B = rand(10,10);

julia> let B = B
           remotecall_fetch(()->B, 2)
       end;

julia> @fetchfrom 2 InteractiveUtils.varinfo()
name           size summary
––––––––– ––––––––– ––––––––––––––––––––––
A         800 bytes 10×10 Array{Float64,2}
Base                Module
Core                Module
Main                Module

可以看出,全局变量 A 是在worker2上定义的,但是 B 被捕获为局部变量,因此为 B worker2上不存在。

并行映射和循环

幸运的是,许多有用的并行计算不需要数据移动。 一个常见的例子是蒙特卡罗模拟,其中多个过程可以同时处理独立的模拟试验。 我们可以使用 @产卵在两个过程中翻转硬币。 首先,将以下函数写入 count_heads。jl:

function count_heads(n)
    c::Int = 0
    for i = 1:n
        c += rand(Bool)
    end
    c
end

的功能 计数头 简单地加在一起 n 随机比特。 以下是我们如何在两台机器上进行一些试验,并将结果加在一起:

julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")

julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)

julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)

julia> fetch(a)+fetch(b)
100001564

此示例演示了一种功能强大且经常使用的并行编程模式。 许多迭代在多个进程上独立运行,然后使用某些函数组合它们的结果。 组合过程被称为_reduction_,因为它通常是张量-秩-减少:数字的向量减少到单个数字,或者矩阵减少到单个行或列等。 在代码中,这通常看起来像模式 x=f(x,v[i]),在哪里 x 是累加器, f 是减少函数,并且 v[i] 元素是否被减少。 这是可取的 f 是关联的,这样操作以什么顺序执行就无关紧要了。

请注意,我们使用这种模式与 计数头 可以一概而论。 我们使用了两个显式 @产卵语句,这将并行性限制为两个进程。 要在任意数量的进程上运行,我们可以使用_parallel for loop_,在分布式内存中运行,可以使用Julia编写 @分布式像这样:

nheads = @distributed (+) for i = 1:200000000
    Int(rand(Bool))
end

此构造实现了将迭代分配给多个进程的模式,并将它们与指定的减少(在这种情况下 (+)). 每次迭代的结果将作为循环内最后一个表达式的值。 整个并行循环表达式本身计算到最终答案。

请注意,虽然parallel for loops看起来像serial for loops,但它们的行为却截然不同。 特别是,迭代不会以指定的顺序发生,并且对变量或数组的写入不会全局可见,因为迭代在不同的进程上运行。 在并行循环中使用的任何变量都将被复制并广播到每个进程。

例如,以下代码将无法按预期工作:

a = zeros(100000)
@distributed for i = 1:100000
    a[i] = i
end

此代码不会初始化所有 a,因为每个进程将有一个单独的副本。 必须避免像这样的循环并行。 幸运的是, 共享数组可以用来绕过这个限制:

using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end

如果变量是只读的,则在并行循环中使用"外部"变量是完全合理的:

a = randn(1000)
@distributed (+) for i = 1:100000
    f(a[rand(1:end)])
end

在这里,每个迭代都适用 f 从向量中随机选择的样本 a 所有进程共享。

正如您所看到的,如果不需要减少运算符,可以省略它。 在这种情况下,循环异步执行,即它在所有可用的worker上生成独立的任务,并返回 未来立即无需等待完成。 呼叫者可以等待 未来稍后通过调用完成 取货/取货在它们上面,或者在循环结束时通过前缀来等待完成 @同步,像 @同步@分发.

在某些情况下,不需要减少运算符,我们只希望将一个函数应用于某个范围内的所有整数(或者更一般地说,应用于某些集合中的所有元素)。 这是另一个名为_parallel map_的有用操作,在Julia中实现为 pmap功能。 例如,我们可以并行计算几个大随机矩阵的奇异值,如下所示:

julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];

julia> pmap(svdvals, M);

朱莉娅的 pmap是为每个函数调用做大量工作的情况而设计的。 相比之下, @分发给 可以处理每个迭代很小的情况,也许只是对两个数字求和。 两者都只使用工作进程 pmap@分发给 于并行计算。 在的情况下 @分发给,最终减少是在调用过程上完成的。

远程引用和AbstractChannels

远程引用总是指一个 [医]抽象通道.

一个具体的实现 [医]抽象通道 (像 频道),被要求实施 放!, 拿!, 取货/取货, 已经准备好了等等!. A所指的远程对象 未来存储在一个 频道{Any}(1),即,一个 频道 尺寸1,可容纳 任何 类型。

远程通道,这是可重写的,可以指向任何类型和大小的通道,或任何其他实现 [医]抽象通道.

构造函数 远程通道(F::函数,pid)() 允许我们构造对持有多个特定类型值的通道的引用。 f 是在上执行的函数 pid 它必须返回一个 [医]抽象通道.

例如, 远程通道(()->通道{Int}(10),pid),将返回对类型的通道的引用 Int型 和尺寸10。 通道存在于worker上 pid.

方法 放!, 拿!, 取货/取货, 已经准备好了等等!在一个 远程通道被代理到远程进程上的支持存储上。

远程通道因此可用于引用用户实现 [医]抽象通道 物体。 下面是一个简单的例子 [医]口述通道 它使用字典作为它的远程存储:

julia> struct DictChannel{T} <: AbstractChannel{T}
           d::Dict
           cond_take::Threads.Condition    # waiting for data to become available
           DictChannel{T}() where {T} = new(Dict(), Threads.Condition())
           DictChannel() = DictChannel{Any}()
       end

朱莉娅>开始
       功能基。放!(D::DictChannel,k,v)
           @锁D.cond_take开始
               D.d[k]=v
               通知(d.cond_take)
           结束
           返回D
       结束
       功能基。拿!(D::DictChannel,k)
           @锁D.cond_take开始
               v=取(D,k)
               删除!(D.d,k)
               返回v
           结束
       结束
       基地。isready(D::DictChannel)=@lock D.cond_take!isempty(d.d)
       基地。isready(D::DictChannel,k)=@lock d.cond_take haskey(D.d,k)
       功能基。获取(D::DictChannel,k)
           @锁D.cond_take开始
               等待(D,k)
               返回D.d[k]
           结束
       结束
       功能基。等待(D::DictChannel,k)
           @锁D.cond_take开始
               而!已准备好(D,k)
                   等待(d.cond_take)
               结束
           结束
       结束
       结束;

朱莉娅>d=DictChannel();

朱莉娅>已准备好(d)
错误

朱莉娅>放!(d,:k,:v);

朱莉娅>isready(d,:k)
真的

朱莉娅>fetch(d,:k)
:v

julia>等待(d,:k)

茱莉亚>拿!(d,:k)
:v

朱莉娅>isready(d,:k)
错误

通道和远程通道

*一个 频道是进程的本地。 Worker2不能直接引用a 频道在工人3上,反之亦然。 A 远程通道但是,可以在工人之间放置和获取价值。 *一个 远程通道可以被认为是一个_handle_到一个 频道. *进程id, pid,与一 远程通道标识支持存储的进程,即支持 频道存在。 *任何涉及 远程通道可以从通道放和拿物品。 数据自动发送到(或从)进程a中检索 远程通道相关联。 *序列化a 频道还序列化通道中存在的任何数据。 因此,反序列化它可以有效地复制原始对象。 *另一方面,序列化一个 远程通道仅涉及标识的位置和实例的标识符的序列化 频道由句柄所指。 反序列化的 远程通道对象(在任何worker上),因此也指向与原始存储相同的支持存储。

上面的通道示例可以修改为进程间通信,如下所示。

我们开始4个工人来处理一个单一的 工作机会 远程信道。 作业,由id标识(工作_id),都写入通道。 在这个模拟中,每个远程执行任务读取一个 工作_id,等待一个随机的时间量,并写回一个元组 工作_id,时间和它自己 pid 到结果通道。 最后所有的 结果 在主进程上打印出来。

julia> addprocs(4); # add worker processes

julia> const jobs = RemoteChannel(()->Channel{Int}(32));

julia> const results = RemoteChannel(()->Channel{Tuple}(32));

julia>@everywhere function do_work(jobs,results)#定义工作函数everywhere
           虽然真实
               job_id=拿!(职位)
               exec_time=兰德()
               sleep(exec_time)#模拟实际工作经过的时间
               放!(结果,(job_id,exec_time,myid()))
           结束
       结束

julia>函数make_jobs(n)
           对于我在1:n
               放!(工作,我)
           结束
       结束;

朱莉娅>n=12;

julia>errormonitor(线程。@spawn make_jobs(n));#用"n"个作业馈送作业通道

julia>for p in workers()#在workers上启动任务以并行处理请求
           remote_do(do_work,p,jobs,results)
       结束

julia>@elased while n>0#打印出结果
           job_id,exec_time,where=take!(结果)
           println("worker job_id在worker(round(exec_time;digits=2))seconds on worker where where")上完成
           全局n=n-1
       结束
1在工人4上以0.18秒完成
2在工人5上以0.26秒完成
6在工人4上以0.12秒完成
7在工人4上以0.18秒完成
工人5在0.35秒内完成
4在工人2上以0.68秒完成
3在工人3上以0.73秒完成
11在工人3上以0.01秒完成
12在工人3上以0.02秒完成
9在工人5上以0.26秒完成
8在工人4上以0.57秒完成
10在工人2上以0.58秒完成
0.055971741

远程引用和分布式垃圾回收

只有删除集群中的_all_持有引用时,才能释放远程引用引用引用的对象。

存储值的节点会跟踪哪些worker引用了该值。 每一次 远程通道或a(未牵伸) 未来被序列化为worker,则通知引用所指向的节点。 每次 远程通道或a(未牵伸) 未来在本地被垃圾回收,拥有该值的节点再次被通知。 这是在内部群集感知串行器中实现的。 远程引用仅在正在运行的群集的上下文中有效。 序列化和反序列化对常规引用和从常规引用的引用 伊俄 不支持对象。

这些通知是通过发送"跟踪"消息来完成的—​当引用序列化到不同的进程时是"添加引用"消息,当引用被本地垃圾收集时是"删除引用"消息。

未来s是一次写入并缓存在本地, 取货/取货ing a 未来还更新拥有该值的节点上的引用跟踪信息。

一旦清除对该值的所有引用,拥有该值的节点将释放它。

未来`s,序列化一个已经提取的 xref:stdlib/Distributed.adoc#Distributed.Future[`未来到另一个节点也发送该值,因为原始远程存储可能已经收集了该值。

重要的是要注意_when_对象被本地垃圾回收取决于对象的大小和系统中当前的内存压力。

在远程引用的情况下,本地引用对象的大小相当小,而存储在远程节点上的值可能相当大。 由于本地对象可能不会立即收集,因此显式调用是一个很好的做法 最后确定关于a的本地实例 远程通道,或在unfetched 未来[自打电话以来 xref:base/parallel.adoc#Base.fetch-Tuple{Task}[取货/取货在一个 未来也从远程存储中删除其引用,这在提取时不是必需的 未来S.显式调用 最后确定导致立即发送到远程节点的消息继续并删除其对值的引用。

一旦完成,引用将无效,并且不能在任何进一步的调用中使用。

本地调用

数据必须复制到远程节点执行。 这是remotecalls和当数据存储到 远程通道 / 未来在不同的节点上。 正如预期的那样,这会导致远程节点上序列化对象的副本。 但是,当目的节点是本地节点,即调用进程id与远程节点id相同时,作为本地调用执行。 它通常(并不总是)在不同的任务中执行-但没有数据的序列化/反序列化。 因此,调用引用与传递相同的对象实例-不创建副本。 此行为如下所示:

julia> using Distributed

julia> rc = RemoteChannel(()->Channel(3));   # RemoteChannel created on local node

julia> v = [0];

julia> for i in 1:3
           v[1] = i                          # Reusing `v`
           put!(rc, v)
       end;

julia> result = [take!(rc) for _ in 1:3];

julia> println(result);
[[3], [3], [3]]

julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1

julia> addprocs(1);

julia> rc = RemoteChannel(()->Channel(3), workers()[1]);   # RemoteChannel created on remote node

julia> v = [0];

julia> for i in 1:3
           v[1] = i
           put!(rc, v)
       end;

julia> result = [take!(rc) for _ in 1:3];

julia> println(result);
[[1], [2], [3]]

julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3

可以看出, 放!在本地拥有 远程通道具有相同的对象 v 在调用之间修改会导致存储相同的单个对象实例。 与副本相反 v 当节点拥有时被创建 rc 是不同的节点。

应该指出的是,这通常不是一个问题。 只有当对象在本地存储并在调用后修改时,才需要考虑它。 在这种情况下,储存 深镜,深镜 的对象。

对于本地节点上的远程调用也是如此,如下例所示:

julia> using Distributed; addprocs(1);

julia> v = [0];

julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v);     # Executed on local node

julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true

julia> v = [0];

julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # Executed on remote node

julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false

再次可以看到,对本地节点的远程调用的行为就像直接调用一样。 调用修改作为参数传递的本地对象。 在远程调用中,它对参数的副本进行操作。

重复一遍,一般来说这不是问题。 如果本地节点也被用作计算节点,并且使用的参数在调用后,则需要考虑此行为,并且如果需要,必须将参数的深层副本传递给在本地节点上调用的调用。 对远程节点的调用将始终对参数的副本进行操作。

共享数组

共享数组使用系统共享内存跨多个进程映射同一个数组。 A 共享阵列 当您希望同一台机器上的两个或多个进程共同访问大量数据时,是一个不错的选择。 通过模块提供共享阵列支持 [医]鱼肉,必须明确加载到所有参与的工人。

一个互补的数据结构由外部包提供https://github.com/JuliaParallel/DistributedArrays.jl[脧锚脧赂`分发的。jl`]的形式 阵列;阵列. 虽然有一些相似之处 共享阵列,行为的一个https://github.com/JuliaParallel/DistributedArrays.jl[脧锚脧赂`阵列;阵列`]是完全不同的。 在一个 共享阵列,每个"参与"进程都有权访问整个数组;相比之下,在一个https://github.com/JuliaParallel/DistributedArrays.jl[脧锚脧赂`阵列;阵列`],每个进程只有一个数据块的本地访问权限,没有两个进程共享同一个数据块。

共享阵列索引(赋值和访问值)的工作原理与常规数组一样,并且效率很高,因为底层内存可供本地进程使用。 因此,大多数算法自然地工作在 共享阵列s,尽管是单进程模式。 在算法坚持的情况下 阵列输入,底层数组可以从一个 共享阵列通过调用 斯达塔. 对于其他 抽象阵列 类别, 斯达塔只是返回对象本身,所以使用它是安全的 斯达塔在任何 阵列-类型对象。

共享数组的构造函数的形式为:

SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])

这就产生了一个 N-位类型的维共享数组 T 和尺寸 暗淡无光 跨越由 pids,pids. 与分布式数组不同,共享数组只能从由 pids,pids 命名参数(以及创建过程,如果它在同一主机上)。 请注意,只有元素是 轨道,轨道在SharedArray中支持。

如果 初始化 签名的功能 initfn(S::SharedArray),是指定的,它是呼吁所有参与的工人。 您可以指定每个worker运行 初始化 阵列的不同部分上起作用,从而并行化初始化。

这里有一个简单的例子:

julia> using Distributed

julia> addprocs(3)
3-element Vector{Int64}:
 2
 3
 4

julia> @everywhere using SharedArrays

julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedMatrix{Int64}:
 2  2  3  4
 2  3  3  4
 2  3  4  4

julia> S[3,2] = 7
7

julia> S
3×4 SharedMatrix{Int64}:
 2  2  3  4
 2  3  3  4
 2  7  4  4

SharedArrays。本地区提供不相交的一维索引范围,有时便于在进程之间拆分任务。 当然,您可以按照自己的意愿划分工作:

julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedMatrix{Int64}:
 2  2  2  2
 3  3  3  3
 4  4  4  4

由于所有进程都可以访问基础数据,因此您必须小心不要设置冲突。 例如:

@sync begin
    for p in procs(S)
        Threads.@spawn begin
            remotecall_wait(fill!, p, S, p)
        end
    end
end

会导致未定义的行为。 因为每个进程都用自己的方法填充_entire_数组 pid,无论哪个进程是最后一个执行(对于 S)将有其 pid 保留下来。

作为一个更扩展和复杂的示例,请考虑并行运行以下"内核:

q[i,j,t+1] = q[i,j,t] + u[i,j,t]

在这种情况下,如果我们尝试使用一维索引拆分工作,我们很可能会遇到麻烦:如果 q[i,j,t] 在分配给一个工人的块的末尾附近, q[i,j,t+1] 在分配给另一个块的开头附近,很可能是 q[i,j,t] 在计算需要的时候还没有准备好 q[i,j,t+1]. 在这种情况下,最好手动对数组进行分块。 让我们沿着第二个维度分开。 定义一个函数,返回 (irange,jrange) 分配给此worker的索引:

julia> @everywhere function myrange(q::SharedArray)
           idx = indexpids(q)
           if idx == 0 # This worker is not assigned a piece
               return 1:0, 1:0
           end
           nchunks = length(procs(q))
           splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
           1:size(q,1), splits[idx]+1:splits[idx+1]
       end

接下来,定义内核:

julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
           @show (irange, jrange, trange)  # display so we can see what's happening
           for t in trange, j in jrange, i in irange
               q[i,j,t+1] = q[i,j,t] + u[i,j,t]
           end
           q
       end

我们还为一个 共享阵列 实施情况

julia> @everywhere advection_shared_chunk!(q, u) =
           advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)

现在让我们比较三个不同的版本,一个在单个进程中运行:

julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);

一个使用 @分布式:

julia> function advection_parallel!(q, u)
           for t = 1:size(q,3)-1
               @sync @distributed for j = 1:size(q,2)
                   for i = 1:size(q,1)
                       q[i,j,t+1]= q[i,j,t] + u[i,j,t]
                   end
               end
           end
           q
       end;

还有一个分块委托的:

julia> function advection_shared!(q, u)
           @sync begin
               for p in procs(q)
                   Threads.@spawn remotecall_wait(advection_shared_chunk!, p, q, u)
               end
           end
           q
       end;

如果我们创造 共享阵列s和时间这些功能,我们得到以下结果(与 朱莉娅-p4):

julia> q = SharedArray{Float64,3}((500,500,500));

julia>u=SharedArray{Float64,3}((500,500,500));

运行一次函数以进行JIT编译和 @时间他们在第二次运行:

julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
 830.220 milliseconds (216 allocations: 13820 bytes)

julia> @time advection_parallel!(q, u);
   2.495 seconds      (3999 k allocations: 289 MB, 2.09% gc time)

julia> @time advection_shared!(q,u);
        From worker 2:       (irange,jrange,trange) = (1:500,1:125,1:499)
        From worker 4:       (irange,jrange,trange) = (1:500,251:375,1:499)
        From worker 3:       (irange,jrange,trange) = (1:500,126:250,1:499)
        From worker 5:       (irange,jrange,trange) = (1:500,376:500,1:499)
 238.119 milliseconds (2264 allocations: 169 KB)

的最大优势 平流-共享! 它最大限度地减少了工人之间的流量,允许每个工人在分配的部分上进行长时间的计算。

共享数组和分布式垃圾回收

与远程引用一样,共享数组也依赖于创建节点上的垃圾回收来释放所有参与工作程序的引用。 创建许多短暂的共享数组对象的代码将受益于尽快显式完成这些对象。 这会导致内存和文件句柄映射共享段更快地被释放。

集群管理者

Julia进程的启动、管理和联网到逻辑集群中是通过集群管理器完成的。 A 集群管理员 负责

*在集群环境中启动工作进程 *在每个工人的生存期内管理事件 *可选,提供数据传输

Julia集群具有以下特征:

*最初的Julia过程,也称为 师父,是特别的,有一个 身份证 的1。 *只有 师父 进程可以添加或删除工作进程。 *所有进程都可以直接相互通信。

工作人员之间的连接(使用内置的TCP/IP传输)通过以下方式建立:

* addprocs用一个 集群管理员 对象。 * addprocs调用适当的 推出在适当的机器上生成所需数量的工作进程的方法。 *每个worker开始侦听空闲端口,并将其主机和端口信息写入 标准输出. *集群管理器捕获 标准输出每个工人,并使其可用于主进程。 *主进程解析此信息并设置与每个worker的TCP/IP连接。 *每个worker也会收到集群中其他worker的通知。 *每个工人连接到所有工人 身份证 小于工人自己的 身份证. *通过这种方式建立了网状网络,其中每个工作人员都与其他工作人员直接连接。

而默认传输层使用plain TCPSocket的,Julia集群可以提供自己的传输。

Julia提供了两个内置的集群管理器:

* 本地管理员,使用时 addprocs()addprocs(np::整数)被称为 * Ssh管理员,使用时 addprocs(主机名::数组)用主机名列表调用

本地管理员 用于在同一主机上启动其他工作程序,从而利用多核和多处理器硬件。

因此,一个最小的集群管理器将需要:

*是摘要的子类型 集群管理员 *实施 推出,负责启动新工人的方法 *实施 管理,它在worker生存期的各种事件中被调用(例如,发送中断信号)

addprocs(经理::FooManager)需要 FooManager 实施:

function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

作为一个例子,让我们看看如何 本地管理员,负责在同一主机上启动worker的管理器,实现:

struct LocalManager <: ClusterManager
    np::Integer
end

function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

推出方法接受以下参数:

* 经理:ClusterManager:集群管理器 addprocs被称为与 * params::Dict:传递给的所有关键字参数 addprocs * 启动::阵列:追加一个或多个的数组 N.工人,工人 对象 * c::条件:启动worker时要通知的条件变量

推出方法在单独的任务中异步调用。 此任务的终止表明所有请求的工作程序都已启动。 因此, 推出函数必须在所有请求的工作程序启动后立即退出。

新启动的worker以全对全的方式相互连接和主进程。 指定命令行参数 --工人[=<cookie>] 导致启动的进程将自己初始化为工作程序,并通过TCP/IP套接字建立连接。

集群中的所有worker共享相同的 饼干作为主人。 当cookie未指定时,即与 --工人 选项,worker尝试从其标准输入中读取它。 本地管理员Ssh管理员 两者都通过标准输入将cookie传递给新启动的工作人员。

默认情况下,worker将侦听由调用返回的地址处的空闲端口。 盖蒂帕德尔(). 可以通过可选参数指定要侦听的特定地址 --bind-to bind_addr[:端口]. 这对于多宿主主机很有用。

作为非TCP/IP传输的示例,实现可以选择使用MPI,在这种情况下 --工人 不得指定。 相反,新推出的工人应该调用 init_worker(cookie) 前使用任何所述平行构建体。

对于每个启动的工人, 推出方法必须添加一个 N.工人,工人 对象(初始化适当的字段) 推出

mutable struct WorkerConfig
    # Common fields relevant to all cluster managers
    io::Union{IO, Nothing}
    host::Union{AbstractString, Nothing}
    port::Union{Integer, Nothing}

    # Used when launching additional workers at a host
    count::Union{Int, Symbol, Nothing}
    exename::Union{AbstractString, Cmd, Nothing}
    exeflags::Union{Cmd, Nothing}

    # External cluster managers can use this to store information at a per-worker level
    # Can be a dict if multiple fields need to be stored.
    userdata::Any

    # SSHManager / SSH tunnel connections to workers
    tunnel::Union{Bool, Nothing}
    bind_addr::Union{AbstractString, Nothing}
    sshflags::Union{Cmd, Nothing}
    max_parallel::Union{Integer, Nothing}

    # Used by Local/SSH managers
    connect_at::Any

    [...]
end

大部分领域 N.工人,工人 被内置的经理使用。 自定义集群管理器通常只指定 伊俄主机 / 港口:

*如果 伊俄 被指定,它用于读取主机/端口信息。 Julia worker在启动时打印出其绑定地址和端口。 这允许Julia worker监听任何可用的空闲端口,而不是要求手动配置worker端口。 *如果 伊俄 未指定, 主机港口 都用于连接。 * 计数, 名称,名称[医]前袋 与从工人启动额外的工人有关。 例如,集群管理器可以为每个节点启动一个工作线程,并使用该工作线程启动其他工作线程。 ** 计数 具有整数值 n 将推出共 n 工人们。 ** 计数 价值为 :自动 将启动与该机器上的CPU线程(逻辑内核)数量一样多的worker。 ** 名称,名称 是名称的 朱莉娅 包括完整路径的可执行文件。 ** [医]前袋 应设置为新工作人员所需的命令行参数。 * 隧道, bind_addr, sshflags最大值,最大值 当需要ssh隧道从主进程连接到工作程序时使用。 * [医]userdata 为自定义群集管理器提供,以存储其自己的特定于工作程序的信息。

管理(管理器::FooManager,id::整数,配置::WorkerConfig,op::符号) 在工人的一生中,在不同的时间被称为适当的 op的 价值:

*与 :登记册/:注销登记 当从Julia worker池中添加/删除worker时。 *与 :中断 何时 中断(工人) 被调用。 该 集群管理员 应用中断信号向适当的工人发信号。 *与 :敲定 用于清理目的。

具有自定义传输的集群管理器

用自定义传输层替换默认的TCP/IP all-to-all套接字连接是一个更复杂的过程。 每个Julia进程都有与其连接的worker一样多的通信任务。 例如,考虑一个全对全网状网络中32个进程的Julia集群:

*因此,每个Julia进程都有31个通信任务。 *每个任务在消息处理循环中处理来自单个远程worker的所有传入消息。 *消息处理循环等待 伊俄 对象(例如,a TCPSocket的在默认实现中),读取整个消息,处理它并等待下一个消息。 *向进程发送消息是直接从任何Julia任务完成的-不仅仅是通信任务-再次,通过适当的 伊俄 对象。

替换默认传输要求新实现设置与远程工作程序的连接并提供适当的 伊俄 消息处理循环可以等待的对象。 要实现的特定于管理器的回调是:

connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)

默认实现(使用TCP/IP套接字)实现为 连接(管理器::ClusterManager,pid::整数,配置::WorkerConfig).

连接 应该返回一对 伊俄 对象,一个用于读取从worker发送的数据 pid,另一个写入需要发送给worker的数据 pid. 自定义集群管理器可以使用内存中 缓冲流,缓冲流 作为自定义之间代理数据的管道,可能不是-伊俄 运输和Julia的内置并行基础设施。

A 缓冲流,缓冲流 是内存中的 xref:base/io-network.adoc#Base.IOBuffer[IOBuffer的 它的行为就像一个 伊俄--它是一个可以异步处理的流。

文件夹 clustermanager/0mq 在https://github.com/JuliaAttic/Examples[Examples repository]包含一个使用ZeroMQ将星形拓扑中的Julia worker与中间的0MQ broker连接起来的示例。 注意:Julia进程仍然全部_logically_相互连接-任何worker都可以直接向任何其他worker发送消息,而不会意识到0MQ被用作传输层。

使用自定义传输时:

*Julia工人不能开始与 --工人. 从 --工人 将导致新启动的worker默认为TCP/IP套接字传输实现。 *对于与worker的每个传入逻辑连接, 基地。process_messages(rd::IO,wr::IO)() 必须打电话。 这将启动一个新的任务,处理从/到由 伊俄 物体。 * init_worker(cookie,manager::FooManager) _must_作为工作进程初始化的一部分被调用。 *领域 connect_at::任何N.工人,工人 可由集群管理器在 推出被调用。 此字段的值传入所有 连接回调。 通常,它将_how to connect_的信息传递给worker。 例如,TCP/IP套接字传输使用此字段指定 (主机,端口) 连接到worker的元组。 kill(管理器,pid,配置) 被调用以从集群中删除一个worker。 在主进程上,相应的 伊俄 对象必须由实现关闭,以确保正确的清理。 默认实现只是执行一个 出口() 调用指定的远程工作程序。

示例文件夹 clustermanager/简单 是一个示例,显示了使用UNIX域套接字进行群集设置的简单实现。

LocalManager和SSHManager的网络要求

Julia集群被设计为在基础设施上已经安全的环境中执行,如本地笔记本电脑、部门集群甚至云。 本节介绍了内置的网络安全要求 本地管理员Ssh管理员:

*主进程不侦听任何端口。 它只与工人联系。 *每个worker仅绑定到其中一个本地接口,并侦听操作系统分配的临时端口号。 * 本地管理员,由 addprocs(N),默认情况下仅绑定到环回接口。 这意味着稍后在远程主机上启动的工作程序(或任何具有恶意意图的人)无法连接到群集。 一个 addprocs(4) 其次是一个 addprocs(["remote_host"]) 会失败。 一些用户可能需要创建一个由其本地系统和一些远程系统组成的集群。 这可以通过显式请求来完成 本地管理员 要通过绑定到外部网络接口 限制;限制 关键字参数: addprocs(4;限制=false). * Ssh管理员,由 addprocs(list_of_remote_hosts),通过SSH在远程主机上启动workers。 默认情况下,SSH仅用于启动Julia workers。 后续的master-worker和worker-worker连接使用普通的、未加密的TCP/IP套接字。 远程主机必须启用无密码登录。 可以通过关键字参数指定其他SSH标志或凭据 sshflags. * addprocs(list_of_remote_hosts;tunnel=true,sshflags=<ssh密钥和其他标志>) 当我们希望为master-worker使用SSH连接时也很有用。 一个典型的场景是本地笔记本电脑运行Julia REPL(即主服务器),其余集群在云上,比如在Amazon EC2上。 在这种情况下,只需要在远程集群上打开端口22,再加上通过公钥基础结构(PKI)进行身份验证的SSH客户端。 身份验证凭据可以通过以下方式提供 sshflags,例如 sshflags=-i<密钥文件>`. 在全对全拓扑(默认)中,所有工作程序通过普通TCP套接字相互连接。 因此,群集节点上的安全策略必须确保临时端口范围(因操作系统而异)的工作程序之间的自由连接。 保护和加密所有工人-工人流量(通过SSH)或加密单个消息可以通过自定义完成 `集群管理员. *如果您指定 多重=真 作为选择 addprocs,SSH复用用于在master和worker之间创建隧道。 如果您已经自行配置了SSH多路复用并且连接已经建立,则无论如何,都使用SSH多路复用 多路复用 选择。 如果启用了多路复用,则使用现有连接设置转发(-O前进 ssh中的选项)。 如果您的服务器需要密码身份验证,这是有益的;您可以通过提前登录到服务器来避免Julia中的身份验证 addprocs. 控制套接字将位于 ~/.ssh/julia-%r@%h:%p 会话期间,除非使用现有的多路复用连接。 请注意,如果在一个节点上创建多个进程并启用多路复用,则带宽可能会受到限制,因为在这种情况下,进程共享单个多路复用TCP连接。

集群Cookie

集群中的所有进程共享同一个cookie,默认情况下,它是主进程上随机生成的字符串:

* cluster_cookie()返回cookie,而 cluster_cookie(cookie)() 设置它并返回新的cookie。 *所有连接都在两侧进行身份验证,以确保只允许由主站启动的worker相互连接。 *Cookie可以在启动时通过参数传递给工人 --工人=<cookie>. 如果参数 --工人 在没有cookie的情况下指定,worker尝试从其标准输入读取cookie(标准普尔). 该 标准普尔 在cookie被检索后立即关闭。 * 集群管理员s可以通过调用来检索master上的cookie cluster_cookie(). 不使用默认TCP/IP传输的集群管理器(因此不指定 --工人)必须打电话 init_worker(cookie,manager) 和主人的饼干一样。

请注意,需要更高级别的安全性的环境可以通过自定义实现这一点 集群管理员. 例如,cookie可以预先共享,因此不指定为启动参数。

指定网络拓扑(实验)

关键字参数 拓扑学 传递给 addprocs用于指定工人必须如何相互连接:

* :all_to_all,默认:所有worker都相互连接。 * :master_worker:只有驱动程序进程,即 pid 1、与工人有联系。 * :海关推出 群集管理器的方法通过字段指定连接拓扑 ident,ident连接_identsN.工人,工人. 具有集群管理器提供身份的worker ident,ident 将连接到指定的所有工人 连接_idents.

关键字参数 懒惰=真/假 只影响 拓扑学 选项 :all_to_all. 如果 真的,集群从连接到所有worker的master开始。 在两个worker之间的第一次远程调用时建立特定的worker-worker连接。 这有助于减少为集群内通信分配的初始资源。 连接的设置取决于并行程序的运行时要求。 默认值 懒惰真的.

目前,在未连接的工作程序之间发送消息会导致错误。 与功能和接口一样,这种行为应该被认为是实验性质的,并且可能在未来的版本中发生变化。

值得注意的外部软件包

在Julia并行之外,有很多外部包应该被提及。 例如,https://github.com/JuliaParallel/MPI.jl[脧锚脧赂`MPI。jl` 是一个朱莉娅的包装器。 MPI 议定书,https://github.com/JuliaParallel/Dagger.jl[脧锚脧赂`匕首jl`]提供类似Python的功能https://dask.org/[Dask],和https://github.com/JuliaParallel/Distributedarrays.jl[脧锚脧赂`分发的。jl`]提供跨worker分布的数组操作,如 上文概述

必须提到Julia的GPU编程生态系统,其中包括:

  1. CUDA.jl封装了各种CUDA库,并支持为Nvidia Gpu编译Julia内核。

  2. oneAPI.jl封装oneAPI统一编程模型,支持在支持的加速器上执行Julia内核。 目前只支持Linux。

  3. AMDGPU.jl封装AMD ROCm库,支持为AMD Gpu编译Julia内核。 目前只支持Linux。

  4. 高级图书馆,如https://github.com/JuliaGPU/KernelAbstractions.jl[KernelAbstractions.jl],https://github.com/mcabbott/Tullio.jl[Tullio.jl]和https://github.com/JuliaComputing/ArrayFire.jl[ArrayFire.jl]。

在下面的例子中,我们将使用两者 分发的。jl库达。jl 通过首先将数组转换为多个进程来分布数组 分发()阵列().

导入时请记住 分发的。jl 要在所有进程中导入它,请使用 @无处不在

$ ./julia -p 4

julia> addprocs()

julia> @everywhere using DistributedArrays

julia>使用CUDA

julia>B=ones(10_000)。/ 2;

julia>A=ones(10_000)。&ast; π;

朱莉娅>C=2。&ast;A./B;

朱莉娅>所有(C。≈ 4&ast;π)
真的

朱莉娅>打字(C)
向量资料{Float64} (数组的别名{Float64, 1})

julia>dB=distribute(B);

julia>dA=分发(A);

julia>dC=2。&ast;达。/分贝;

朱莉娅>所有(dC。≈ 4&ast;π)
真的

朱莉娅>打字(dC)
分发的。阵列;阵列{Float64,1,Vector{Float64}}

朱莉娅>幼崽=CuArray(B);

julia>cuA=CuArray(A);

朱莉娅>cuC=2。&ast;cuA。/幼崽;

朱莉娅>所有(cuC。≈ 4&ast;π);
真的

朱莉娅>typeof(cuC)
[医]阵列{Float64,1}

在下面的例子中,我们将使用两者 分发的。jl库达。jl 跨多个进程分发数组并对其调用泛型函数。

function power_method(M, v)
    for i in 1:100
        v = M&ast;v
        v /= norm(v)
    end

    return v, norm(M&ast;v) / norm(v)  # or  (M&ast;v) ./ v
end

力量-力量 重复创建一个新的向量并对其进行规范化。 我们没有在函数声明中指定任何类型签名,让我们看看它是否适用于上述数据类型:

julia> M = [2. 1; 1 1];

julia> v = rand(2)
2-element Vector{Float64}:
0.40395
0.445877

julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)

julia> cuM = CuArray(M);

julia> cuv = CuArray(v);

julia> curesult = power_method(cuM, cuv);

julia> typeof(curesult)
CuArray{Float64,1}

julia> dM = distribute(M);

julia> dv = distribute(v);

julia> dC = power_method(dM, dv);

julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Vector{Float64}},Float64}

要结束这种对外部封装的短暂暴露,我们可以考虑 MPI。jl,MPI协议的Julia包装器。 由于考虑每个内部功能需要太长时间,因此最好简单地了解用于实现协议的方法。

考虑这个玩具脚本,它简单地调用每个子进程,实例化其rank,当达到主进程时,执行rank’sum

import MPI

MPI.Init()

comm = MPI.COMM_WORLD
MPI.Barrier(comm)

root = 0
r = MPI.Comm_rank(comm)

sr = MPI.Reduce(r, MPI.SUM, root, comm)

if(MPI.Comm_rank(comm) == root)
   @printf("sum of ranks: %s\n", sr)
end

MPI.Finalize()
mpirun -np 4 ./julia example.jl

1. 在此上下文中,MPI指的是MPI-1标准。 从MPI-2开始,MPI标准委员会引入了一套新的通信机制,统称为远程内存访问(Rma)。 将rma添加到MPI标准的动机是为了促进片面的通信模式。 有关最新MPI标准的更多信息,请参阅https://mpi-forum.org/docs.