Engee 文档

多处理和分布式计算

模块中实现了对分布式内存系统中并行计算的支持 `Distributed',它包含在Julia标准库中。

大多数现代计算机都有多个Cpu,计算机本身可以组合成集群。 使用多个Cpu的功能可以加快许多计算。 性能受两个主要因素的影响:Cpu本身的速度和内存访问速度。 很明显,在集群中,单个CPU将能够更快地访问其计算机(节点)的RAM。 更令人惊讶的是,在任何多核计算机上都可以观察到类似的情况,因为主存储器的性能差异和https://www.akkadia.org/drepper/cpumemory.pdf [缓存]。 因此,在设计良好的多处理环境中,必须控制特定存储器块与特定CPU的绑定。 Julia提供了一个基于消息的多处理环境:程序可以作为多个进程在不同的内存区域中同时执行。

消息传递在Julia中实现的方式不同于在MPI等其他环境中实现的方式。[在此上下文中,MPI指的是MPI-1标准。 自MPI-2以来,MPI标准制定委员会引入了一套新的互操作性机制,统称为"远程内存访问"(Rma)。 将rma添加到MPI标准的动机是简化单向通信模式。 有关最新MPI标准的更多信息,请参阅https://mpi-forum.org/docs …​]. Julia中的交互通常是片面的。 这意味着在双进程操作的情况下,程序员只显式控制一个进程就足够了。 此外,交互通常不仅以发送和接收消息的形式进行,而且更类似于高级操作,例如对用户函数的调用。

Julia中的分布式编程基于两个原语:远程引用和远程调用。 远程引用是可用于从任何进程访问存储在特定其他进程中的对象的对象。 远程调用是一个进程在另一个(或同一个)进程中调用具有某些参数的函数的请求。

删除的链接可以有两种类型:方法 `Future'和宏 'RemoteChannel'

远程调用返回链接 `Future'对于执行的结果。 远程调用立即返回控制权:当调用本身在其他地方执行时,使其进入下一个操作的进程。 若要等待远程调用结束,请调用该函数 `wait'返回的对象 'Future',并获得完整的结果值,使用方法 '取'

反过来,对象 'RemoteChannel'支持复盖。 例如,多个进程可以通过参考相同的远程信道(`Channel')来协调处理。

每个进程都有一个与其关联的标识符。 提供Julia交互式命令提示符的进程总是`id’为1。 默认情况下用于并行操作的进程称为worker。 当只有一个进程时,进程1被认为是一个worker。 否则,除进程1之外的所有进程都被视为工人。 因此,要利用并行处理方法如 'pmap',必须使用两个或多个进程。 例如,如果您希望主进程在工作线程忙于冗长的计算时继续执行其操作,则添加一个附加进程非常有用。

让我们试着在实践中弄清楚。 使用’julia-p n’命令时,在本地计算机上启动’n’工作流。 通常,使`n`等于计算机上CPU线程(逻辑内核)的数量是有意义的。 请注意,当指定`-p’参数时,模块会自动加载。 '分布式'

$ julia -p 2

julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)

julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.18526  1.50912
 1.16296  1.60607

方法的第一个参数 'remotecall'--被调用的函数。 当并行编程时,Julia通常不会指定特定的进程或可用进程的数量,但是 'remotecall'被认为是提供更详细控制的低级接口。 第二个论点 'remotecall'是将执行工作的进程的’id'。 其余的参数将传递给被调用的函数。

因此,在第一行中,向进程2发送了创建2乘2矩阵的请求,在第二行中,向其添加值1的请求。 两个计算的结果在两个未来的对象中可用’r’和’s'。 宏 '@spawnat'在第一个参数中指定的过程中计算第二个参数中指定的表达式。

有时可能需要立即检索远程计算的值。 当您从远程对象接收下一个本地操作所必需的数据时,通常会发生这种情况。 为此,有一个功能 'remotecall_fetch'。 它相当于函数’fetch(remotecall(…​))+`,但效率更高。

julia> remotecall_fetch(r-> fetch(r)[1, 1], 2, r)
0.18526337335308085

此代码从工作流2中的数组返回第一个值。 请注意,在这种情况下,`fetch`方法不会移动数据,因为它是在拥有数组的工作流中执行的。 它也可以写成如下。

julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.10824216411304866

正如你所记得的,表达 'getindex(r,1,1)' 等价于'r[1,1]`,因此此调用获取Future对象`r’的第一个元素。

为了简化您的任务,您可以将其传递给宏 @spawnatcharacter`:any’用于自动选择工作流。

julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)

julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.38854  1.9098
 1.20939  1.57158

请注意,我们使用了`1。+fetch(r)`而不是`1。+r'。 原因是我们不知道代码将在哪里执行,所以一般来说,将`r’移动到执行添加的过程需要 '取'。 在这种情况下,宏 '@spawnat'知道计算需要在’r’所属的进程中执行,因此 'fetch'将是一个空闲操作(没有工作正在执行)。

(值得注意的是, '@spawnat'--这不是一个内置函数,而是 在Julia中定义。 您可以定义自己的类似结构。)

重要的是要记住,收到对象后 `Future'在本地缓存其值。 进一步的挑战 'fetch'不会导致网络跳转。 收到所有链接后 `Future'远程存储的值被删除。

'@async'类似于宏 '@spawnat',但仅在本地进程中执行任务。 它用于为每个进程创建一个提供程序任务(馈线)。 每个任务获取下一个要计算的索引,等待进程完成,然后重复执行,直到索引用完。 请注意,供应商任务不会开始执行,直到主要任务到达块的末尾。 '@sync'。 此时,它放弃控制并等待所有本地任务完成,之后函数返回控制。 从0.7版本开始,供应商任务可以通过nextidx报告其状态,因为它们都在同一进程中执行。 即使任务是联合规划的,在某些情况下,例如,当 异步I/O,可能需要阻塞。 这意味着上下文切换仅在特定时间发生,在这种情况下,在调用时 'remotecall_fetch'。 这是当前的实现状态,在Julia的未来版本中可能会发生变化。 它的目标是确保在M个进程(`进程')的数量中完成最多N个任务,即模型https://en.wikipedia.org/wiki/Thread_ (计算)#Models[M:N多线程]。 nextidx将需要锁定和释放模型,因为多个进程同时从资源读取和写入资源是危险的。

代码可用性和包下载

您的代码应该可用于执行它的任何进程。 例如,在Julia命令提示符中键入以下内容:

julia> function rand2(dims...)
           return 2*rand(dims...)
       end

julia> rand2(2,2)
2×2 Array{Float64,2}:
 0.153756  0.368514
 1.15119   0.918912

julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))))
Stacktrace:
[...]

进程1知道函数`rand2',但进程2不知道。

代码通常从文件或包中加载,您可以灵活地控制代码加载到哪些进程中。 让我们拿’DummyModule’文件。jl’包含以下代码。

module DummyModule

export MyType, f

mutable struct MyType
    a::Int
end

f(x) = x^2+1

println("loaded")

end

这样所有进程都可以引用’MyType`,'DummyModule’文件。jl’必须加载到每个进程中。 当调用’include("DummyModule.jl")'它只加载到一个进程中。 要将文件上载到每个进程,请使用宏 @everywhere(Julia应该用命令’julia-p2’启动)。

julia> @everywhere include("DummyModule.jl")
loaded
      From worker 3:    loaded
      From worker 2:    loaded

像往常一样,`DummyModule’模块不包括在进程范围内。 这需要使用关键字。 '使用''导入'。 而且,当`DummyModule’在一个进程中注入到作用域中时,它不会在其他进程中发生。:

julia> using .DummyModule

朱莉娅>MyType(7)
类型(7)

julia>fetch(@spawnat2MyType(7))
错误:在worker2上:
UndefVarError:`Mytype`未在'Main'中定义
⋮

julia>fetch(@spawnat2DummyModule.MyType(7))
类型(7)

但是,您仍然可以将`MyType`传递给加载了`DummyModule`模块但未包含在作用域中的进程。

julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)

该文件还可以在启动时使用`-L`标志预加载到多个进程中,并且可以使用激活脚本来激活其计算。

julia -p <n> -L file1.jl -L file2.jl driver.jl

在上面的示例中,执行激活脚本的Julia进程具有值为1的"id",提供交互式命令提示符的进程也是如此。

最后,如果’DummyModule。jl’不是一个单独的文件,而是一个包。 'Using DummyModule’运算符加载DummyModule。jl’包含在所有进程中,但仅在调用运算符的进程中包含作用域。 '使用'

启动和管理工作流

基本Julia安装支持两种类型的集群:

  • 使用`-p’参数创建的本地集群,如上所示;

  • 使用`--machine-file`参数创建跨越多台计算机的群集。 要在指定的计算机上运行Julia工作流(从当前主机的路径),使用ssh登录而无需密码。 每台计算机都以以下形式指定:`[count*][user@]host[:port][bind_addr[:port]]'。 默认情况下,'user’是当前用户,`port’是标准SSH端口。 'count’是节点上产生的工作进程数(默认为1)。 可选的`bind-to bind_addr[:port]'元素定义了其他工作进程应用于连接到此进程的IP地址和端口。

虽然Julia中通常提供向后兼容性,但跨工作流的代码分发是使用以下方法完成的 '序列化。序列化'。 如相关文档所示,在使用不同版本的Julia时,此机制可能不起作用,因此最好所有计算机上的所有工作流都使用相同的版本。

以下函数可用于以编程方式添加、删除和查询群集中的进程: 'addprocs', 'rmprocs', '工人'和其他人。

julia> using Distributed

朱莉娅>addprocs(2)
2元素数组{Int64,1}:
 2
 3

挑战前 `addprocs'模块 'Distributed'必须显式加载到主进程中。 它在工作流中自动可用。

请注意,工作进程不执行启动脚本'~/。朱莉娅/配置/启动。jl’并且不与其他正在运行的进程同步它们的全局状态(包括命令行参数、全局变量、新方法定义和加载的模块)。 您可以使用’addprocs(exeflags="--project")初始化具有特定环境的工作流,然后使用<modulename>运行命令@everywhere`或`@everywhere include("file。jl")`。

要支持其他类型的集群,您可以编写ClusterManager的自定义实现,如下所述 ClusterManager对象

移动数据

在分布式程序中,大部分工作都是在发送消息和移动数据上完成的。 为了确保高性能和可扩展性,最大限度地减少消息数量和数据传输量非常重要。 为此,您需要了解Julia中的各种分布式编程结构如何移动数据。

'fetch'可以被视为显式的数据传输操作,因为将对象移动到本地计算机是直接请求的。 宏 '@spawnat'(以及一些相关的构造)也移动数据,但以不太明显的方式移动,因此这可以称为隐式数据移动操作。 让我们考虑两种方法来创建随机矩阵并对其进行平方。

方法1

julia> A = rand(1000,1000);

julia> Bref = @spawnat :any A^2;

[...]

julia> fetch(Bref);

方法2

julia> Bref = @spawnat :any rand(1000,1000)^2;

[...]

朱莉娅>fetch(Bref);

差异似乎微不足道,但由于宏的行为,它实际上会产生显着差异。 '@spawnat'。 在第一种方法中,在本地创建一个随机矩阵,然后转移到另一个过程,在那里它被平方。 在第二种方法中,在另一个过程中创建和平方随机矩阵。 因此,在第二种情况下传输的数据比在第一种情况下少得多。

在这个最简单的例子中,两种方法之间的区别是显而易见的,因此很容易选择正确的方法。 然而,在真实世界的程序中,可能需要仔细考虑移动数据并可能进行测量。 例如,如果矩阵`A`是第一过程需要的,则使用第一方法可能更好。 如果计算矩阵’A’需要大量资源并且只有当前进程具有它们,那么它将不可避免地必须移动到另一个进程。 最后,如果当前进程在调用之间几乎没有任何关系 '@spawnat'和’fetch(Bref)',完全放弃并行性可能会更好。 让我们也想象一下,而不是’兰德(1000,1000)"我们有一个更复杂的操作。 在这种情况下,添加另一个运算符可能是有意义的。 `@spawnat'专门为她。

全局变量

由远程执行的表达式 '@spawnat',或使用以下方法分配给远程执行的闭包 'remotecall',可以引用全局变量。 "Main"模块中的全局绑定的处理方式与其他模块中的全局绑定略有不同。 考虑下面的代码片段。

A = rand(10,10)
remotecall_fetch(()->sum(A), 2)

在这种情况下,功能 sum'必须在远程进程中定义。 请注意’A’是在本地工作区中定义的全局变量。 工作流2在"Main"中没有名为"A"的变量。 将闭包()->sum(A)`发送到工作流2会使其定义`Main。一个'。 ""主啊。即使在呼叫控制返回后,a’仍继续存在于工作流2中。 'remotecall_fetch'。 具有嵌入式全局引用的远程调用(仅在`Main’模块中)按如下方式管理全局变量。

  • 如果作为远程调用的一部分进行访问,则会在目标工作流中创建新的全局变量。

  • 全局常量也被声明为远程节点上的常量。

  • 全局变量仅在远程调用的上下文中传递回目标工作流,并且仅在值发生更改时传递回目标工作流。 此外,群集节点之间不会同步全局绑定。 例如:

    A = rand(10,10)
    remotecall_fetch(()->sum(A), 2) # рабочий процесс 2
    A = rand(10,10)
    remotecall_fetch(()->sum(A), 3) # рабочий процесс 3
    A = nothing

    由于上面的代码片段,变量是’Main。A’在工作流2和’主。工作流3中的a’将具有不同的值,并且’Main。节点1上的a`取值’nothing'。

正如您可能已经猜到的那样,当在主进程中重新分配全局变量时,可以清除与全局变量相关联的内存,但在工作流中不会发生这种情况,因为绑定继续运 如果不再需要远程节点中的某些全局变量,则可以使用该方法将它们设置为"无" '清除!`. 结果,在下一个垃圾回收周期期间释放关联的内存。

因此,在远程调用中应谨慎引用全局变量。 如果可能的话,完全避免它会更好。 如果仍然需要引用全局变量,建议使用`let`块来本地化它们。

例如:

julia> A = rand(10,10);

julia> remotecall_fetch(()->A, 2);

julia> B = rand(10,10);

julia> let B = B
           remotecall_fetch(()->B, 2)
       end;

julia>@fetchfrom2InteractiveUtils.varinfo()
名称大小摘要
––––––––– ––––––––– ––––––––––––––––––––––
一个800字节10×10数组{Float64,2}
基本模块
核心模块
主模块

如您所见,全局变量`A`在工作流2中定义,但`B`被捕获为局部变量,因此在工作流2中没有`B`的绑定。

并行映射和循环

幸运的是,对于许多有用的并行计算,没有必要移动数据。 一个常见的例子是蒙特卡罗模拟,其中几个过程可以同时进行独立的模型测试。 我们可以使用 '@spawnat'在两个过程中翻转硬币。 首先,让我们在’count_heads中编写以下函数。jl’文件。

function count_heads(n)
    c::Int = 0
    for i = 1:n
        c += rand(Bool)
    end
    c
end

'Count_heads’函数简单地将`n`个随机位相加。 如下所示,您可以在两台计算机上运行测试,然后将结果相加。

julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")

julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)

julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)

julia> fetch(a)+fetch(b)
100001564

此示例演示了一种常见且非常有效的并行编程模式。 迭代在多个过程中独立执行,然后使用函数组合它们的结果。 组合的过程称为减少,因为它通常涉及降低张量的化合价:将数字的向量减少到单个数字,或者将矩阵减少到单个行或列等。 在代码中,这通常看起来像模式’x=f(x,v[i])`,其中`x`是加法器,`f`是减少函数,`v[i]`是减少的元素。 希望函数’f’是关联的。:这样,操作可以按任何顺序执行。

请注意,我们的这种模式与`count_heads`相关的用例可以概括。 我们使用了两个显式运算符 '@spawnat',因此并行性仅限于两个进程。 要在任意数量的进程中运行,可以使用在分布式内存中运行的_parallel for_循环。 在Julia中,可以使用 '@distributed'如下。

nheads = @distributed (+) for i = 1:200000000
    Int(rand(Bool))
end

这种构造实现了将迭代分配给多个过程的模式,然后使用指定的减少(在这种情况下,(+))组合它们的结果。 每次迭代的结果是循环内最后一个表达式的值。 作为计算整个并行循环表达式的结果,得到最终答案。

请记住,虽然并行for循环类似于顺序循环,但它们的行为根本不同。 特别是,迭代不会以特定的顺序发生,并且分配给变量或数组的值不会全局可用,因为迭代是在不同的过程中执行的。 在并行循环内部使用的变量被复制并传输到每个进程。

例如,下面的代码将无法正常工作。

a = zeros(100000)
@distributed for i = 1:100000
    a[i] = i
end

此代码不会初始化`a`的所有元素,因为在每个进程中都会有一个单独的副本。 应该避免这样的循环并行。 幸运的是,可以使用以下方法规避此限制 共享数组

using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end

如果变量是只读的,则在并行循环中使用外部变量是完全合理的。

a = randn(1000)
@distributed (+) for i = 1:100000
    f(a[rand(1:end)])
end

这里,在每次迭代中,函数’f`被应用于向量`a’的一个随机选择的元素,该元素被所有进程共享。

如您所见,如果不需要减少运算符,则可以省略它。 在这种情况下,循环异步运行,即它在所有可用的工作流中生成独立的任务,并立即返回一个对象数组。 'Future',而无需等待执行完成。 调用者可以等待对象完成。 `未来'更远的地方,呼唤他们 'fetch',或者在循环结束时等待执行完成,为此您应该在它之前指定 '@sync',例如,'@sync@distributed for'。

在某些情况下,如果我们只想将函数应用于某个范围内的所有整数(或者更一般地应用于某些集合的所有元素),则不需要减少运算符。 这是另一个名为_parallel matching_的有用操作。 它是通过函数在Julia中实现的 'pmap'。 例如,可以如下计算并行模式下的几个大随机矩阵的奇异值。

julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];

julia> pmap(svdvals, M);

功能 'pmap'是为每个函数调用执行大量工作的情况而设计的。 相反’@distributed for’适用于每次迭代非常简单的情况,甚至到两个数字简单地加在一起的地步。 对于并行计算使用 'pmap'和'@distributed for’仅使用工作流。 在'@distributed for’的情况下,在调用过程中执行最终减少。

删除的链接和AbstractChannel对象

删除的链接始终与"AbstractChannel"的实现相关联。

在`AbstractChannel`(例如,Channel)的特定实现中,必须实现这些方法 '放!, '拿!, '取', 已经准备好''等待'。 引用的已删除对象 `Future',存储在'+Channel{Any}(1)+,即在大小为1的通道中,用于类型为"Any"的对象。

一个对象 'RemoteChannel`可以指向任何类型和大小的通道,或`AbstractChannel’的任何其他实现。

'RemoteChannel(f)构造函数::Function,pid)()`允许您创建指向包含多个特定类型值的通道的链接。 'f’是在’pid’进程中执行的函数。 它应该返回’AbstractChannel’对象。

例如,表达式'RemoteChannel(()->Channel{Int}(10),pid)`返回对大小为10的`Int`类型通道的引用。 通道存在于’pid’工作流中。

方法 '放!, '拿!, '取', `已经准备好''等待'对象 `RemoteChannel'被代理到远程进程的辅助存储。

因此, 'RemoteChannel`可用于引用用户实现的`AbstractChannel’对象。 一个简单的例子是下面的’DictChannel',它使用字典作为远程存储。:

julia> struct DictChannel{T} <: AbstractChannel{T}
           d::Dict
           cond_take::Threads.Condition    # ожидание доступных данных
           DictChannel{T}() where {T} = new(Dict(), Threads.Condition())
           DictChannel() = DictChannel{Any}()
       end

julia> begin
       function Base.put!(D::DictChannel, k, v)
           @lock D.cond_take begin
               D.d[k] = v
               notify(D.cond_take)
           end
           return D
       end
       function Base.take!(D::DictChannel, k)
           @lock D.cond_take begin
               v = fetch(D, k)
               delete!(D.d, k)
               return v
           end
       end
       Base.isready(D::DictChannel) = @lock D.cond_take !isempty(D.d)
       Base.isready(D::DictChannel, k) = @lock D.cond_take haskey(D.d, k)
       function Base.fetch(D::DictChannel, k)
           @lock D.cond_take begin
               wait(D, k)
               return D.d[k]
           end
       end
       function Base.wait(D::DictChannel, k)
           @lock D.cond_take begin
               while !isready(D, k)
                   wait(D.cond_take)
               end
           end
       end
       end;

julia> d = DictChannel();

julia> isready(d)
false

julia> put!(d, :k, :v);

julia> isready(d, :k)
true

julia> fetch(d, :k)
:v

julia> wait(d, :k)

julia> take!(d, :k)
:v

julia> isready(d, :k)
false

远程通道和对象

  • 一个对象 `Channel'是进程的本地。 工作流2无法直接引用 '通道'(反之亦然)。 然而 'RemoteChannel'可以在工作流之间发送和接收值。

  • 一个对象 'RemoteChannel`可以表示为对象描述符 '通道'

  • 与之关联的进程ID`pid 'RemoteChannel',定义一个进程,其中存在辅助存储,即辅助对象 '通道'

  • 任何带有链接的进程 'RemoteChannel'可以将项目发送到通道并从通道接收它们。 数据自动发送到(或从)与对象关联的进程中检索。 'RemoteChannel`

  • 序列化通道时 'Channel'也序列化其中的所有数据。 因此,在反序列化它时,会创建原始对象的副本。

  • 相反,在序列化对象时 'RemoteChannel`只有定义位置和实例的标识符被序列化 Channel。 因此,反序列化的对象 'RemoteChannel'(在任何工作流中)指向与原始存储相同的辅助存储。

可以更改上述通道示例,以便进行进程间通信,如下所示。

启动四个工作流来处理单个远程作业通道。 由`job_id`标识的作业记录在通道中。 在这个例子中,每个远程执行的任务读取’job_id`,等待随机的时间量,并将一个由`job_id`,超时和它自己的`pid’组成的元组写入结果通道。 最后,主进程从"结果"通道输出所有数据。

julia> addprocs(4); # Добавляем рабочие процессы

julia> const jobs = RemoteChannel(()->Channel{Int}(32));

julia> const results = RemoteChannel(()->Channel{Tuple}(32));

julia> @everywhere function do_work(jobs, results) # Определяем рабочую функцию через everywhere
           while true
               job_id = take!(jobs)
               exec_time = rand()
               sleep(exec_time) # Имитирует время, затрачиваемое на реальную работу
               put!(results, (job_id, exec_time, myid()))
           end
       end

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> errormonitor(@async make_jobs(n)); # Подаем в канал jobs n заданий

julia> for p in workers() # Запускаем в рабочих процессах задачи для параллельной обработки запросов
           remote_do(do_work, p, jobs, results)
       end

julia> @elapsed while n > 0 # Выводим результаты
           job_id, exec_time, where = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
           global n = n - 1
       end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741

删除的链接和分布式垃圾回收

删除链接指向的对象只有在删除集群中的所有链接后才能清理。

存储值的节点监视引用它的工作进程。 每次对象 'RemoteChannel'`Future'(其收据已被取消)序列化到工作流中,并且链接指向的节点会收到通知。 拥有该值的节点在每次对象时也会收到通知 'RemoteChannel'`Future'(其收据已被取消)由垃圾收集器清除。 这些操作在具有群集支持的内部序列化程序中实现。 删除的链接仅在集群执行的上下文中有效。 不支持将引用序列化为常规’IO`对象并从中反序列化。

发送跟踪消息以进行通知:有关在链接序列化到另一个进程时添加链接的消息,以及有关在垃圾收集器本地清理链接时删除链接的消息。

由于对象 `Future'被写入一次,并在应用时缓存在本地 `取''Future'还更新有关跟踪值所属节点上的链接的信息。

拥有该值的节点在清除对其的所有引用后将其删除。

物的情况下 `未来',已经收到的序列化 'Future'还伴随着将值转移到另一个节点,因为此时收集器已经可以清除原始远程存储中的值。

需要注意的是,垃圾回收器在本地清理的对象取决于对象的大小和系统上当前的内存可用性。

在远程链接的情况下,本地链接对象的大小相当小,而存储在远程节点中的值可以相当大。 由于本地对象可能不会立即被收集器清理,因此建议显式调用 finalize'本地实例 'RemoteChannel'或对象 `未来',其收据已被取消。 从打电话时起 'fetch' 'Future'也从远程存储中删除其引用,对接收到的对象执行此操作。 '未来不是必需的。 明确的挑战 `finalize'导致立即向远程节点发送消息,指出需要删除对值的引用。

消除后,链接变为无效,不能在后续调用中使用。

本地电话

数据不可避免地被复制到远程调用执行。 这种情况发生在远程调用期间和数据保存在 'RemoteChannel'`Future'在另一个节点中。 正如预期的那样,这导致在远程节点中形成序列化对象的副本。 然而,当目的节点是本地节点,即调用进程的ID与远程节点的ID匹配时,调用作为本地执行。 它通常(但不总是)在另一个任务中执行,但没有对数据进行序列化或反序列化。 因此,调用引用正在传递的对象的相同实例,但不会创建副本。 此行为如下所示。

julia> using Distributed;

julia> rc = RemoteChannel(()->Channel(3));   # RemoteChannel создается в локальном узле

julia> v = [0];

julia> for i in 1:3
           v[1] = i                          # Используем `v` повторно
           put!(rc, v)
       end;

julia>结果=[拿!(rc)对于_在1:3];

julia>println(结果);
阵列{Int64,1}[[3], [3], [3]]

julia>println("Num Unique objects:",length(unique(map(objectid,result))));
Num唯一对象:1

朱莉娅>addprocs(1);

julia>rc=RemoteChannel(()->Channel(3),workers()[1]);#RemoteChannel在远程节点中创建

朱莉娅>v=[0];

朱莉娅>对于我在1:3
           v[1]=i
           放!(rc,v)
       结束;

julia>结果=[拿!(rc)对于_在1:3];

julia>println(结果);
阵列{Int64,1}[[1], [2], [3]]

julia>println("Num Unique objects:",length(unique(map(objectid,result))));
Num唯一对象:3

从例子中可以看出,方法调用 '放!'为本地频道 'RemoteChannel`当对象`v’在调用之间更改时,保存对象的相同实例。 当拥有`rc`的节点不同时,会创建`v`的副本。

应该指出的是,作为一项规则,这是没有问题的。 只有当对象在本地保存并在调用后更改时,才应考虑到这一点。 在这种情况下,您可能需要保留对象的"deepcopy"。

对于本地节点上的远程调用也是如此,如下例所示。

julia> using Distributed; addprocs(1);

julia> v = [0];

julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v);     # Выполняется в локальном узле

julia>println("v=$v,v2=$v2,",v===v2);
v=[1],v2=[1],真

朱莉娅>v=[0];

julia>v2=remotecall_fetch(x->(x[1]=1;x),workers()[1],v);#在远程节点上运行

julia>println("v=$v,v2=$v2,",v===v2);
v=[0],v2=[1],false

正如您在本例中看到的,对本地节点的远程调用与直接调用的工作方式相同。 它修改作为参数传递的本地对象。 进行远程调用时,使用参数的副本执行操作。

让我们再次重申,作为一项规则,这是没有问题的。 如果本地节点也用作计算节点,并且在调用之后使用参数,则应考虑此功能,并在必要时将参数的深层副本传递给本地节点中进行的调用。 对远程节点的调用始终使用参数副本。

共享数组

共享数组使用系统的共享内存跨多个进程映射同一个数组。 `SharedArray'是一个不错的选择。 对共享数组的支持由`SharedArrays`模块提供,该模块必须显式加载到所有参与进程中。

一个额外的数据结构由外部包提供https://github.com/JuliaParallel/DistributedArrays.jl ['DistributedArrays.jl']作为’DArray'。 虽然在某些方面,对象类似于 `SharedArray',行为https://github.com/JuliaParallel/DistributedArrays.jl ['DArray']在许多方面都是不同的。 的情况下 'SharedArray'每个处理它的进程都可以访问整个数组。 相反,在对象的情况下https://github.com/JuliaParallel/DistributedArrays.jl ['DArray']每个进程只能对一部分数据进行本地访问,而这些部分对于所有进程都是不同的。

'SharedArray'被索引(为了赋值和访问它们的目的),就像一个常规数组一样,这是有效的,因为相应的内存区域可供本地进程使用。 因此,大多数算法自然与 'SharedArray',尽管是单进程模式。 如果算法需要输入数组 `Array',基数组可以从 `SharedArray'通过调用函数 'sdata'。 对于其他类型的’AbstractArray’函数 'sdata'只是返回对象本身,所以 'sdata'可以安全地应用于`Array`类型的任何对象。

共享数组的构造函数如下所示。

SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])

它创建一个位类型为"T"和大小为"dims"的"N"维共享数组,可供"pid"中指定的进程访问。 与分布式数组不同,共享数组只能从命名参数"pid"中指定的参与进程访问(如果此进程在同一主机上运行,则可以从创建它的进程访问)。 请注意,SharedArray中仅支持类型的元素。 'isbits'

如果`init`函数使用签名`initfn(S::SharedArray)'指定,则在所有参与的工作流中调用它。 您可以指定每个工作流应为数组的单独部分执行`init`函数,以便并行化初始化。

下面是一个小例子。

julia> using Distributed

julia> addprocs(3)
3-element Array{Int64,1}:
 2
 3
 4

julia> @everywhere using SharedArrays

julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  3  4  4

julia> S[3,2] = 7
7

julia> S
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  7  4  4

功能 'SharedArrays。localindices'提供单独的一维索引范围,有时用于在进程之间划分任务。 当然,你可以根据自己的喜好划分工作。

julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedArray{Int64,2}:
 2  2  2  2
 3  3  3  3
 4  4  4  4

由于基础数据可供所有进程使用,因此确保不存在冲突非常重要。 例如:

@sync begin
    for p in procs(S)
        @async begin
            remotecall_wait(fill!, p, S, p)
        end
    end
end

会导致未定义的行为。 由于每个进程都用自己的`pid`填充整个数组,因此结果将是最后执行的进程的`pid`(对于任何`S`元素)。

为了考虑一个更复杂的例子,让我们并行运行下面的内核。

q[i,j,t+1] = q[i,j,t] + u[i,j,t]

在这种情况下,如果我们尝试使用一维索引来划分工作,则很可能会出现麻烦:如果`q[i,j,t]`接近分配给一个工作流的块的末尾,并且`q[i,j,t+1]`接近块的开 分配给另一个进程,有很高的概率`q[i,j,t]`的部分在需要计算`q[i,j,t+1]`的时候不会准备好。 在这种情况下,最好手动将阵列分成几个部分。 让我们将数组除以第二维。 定义一个函数,返回分配给此工作流的索引'(irange,jrange)'。

julia> @everywhere function myrange(q::SharedArray)
           idx = indexpids(q)
           if idx == 0 # Этому рабочему процессу часть не назначена.
               return 1:0, 1:0
           end
           nchunks = length(procs(q))
           splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
           1:size(q,1), splits[idx]+1:splits[idx+1]
       end

接下来,我们定义核心。

julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
           @show (irange, jrange, trange)  # Выведем на экран, чтобы знать, что происходит.
           for t in trange, j in jrange, i in irange
               q[i,j,t+1] = q[i,j,t] + u[i,j,t]
           end
           q
       end

我们还将为`SharedArray’的实现定义一个辅助包装器。

julia> @everywhere advection_shared_chunk!(q, u) =
           advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)

现在让我们比较三个不同的版本。 在同一进程中运行:

julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);

使用 '@分布式':

julia> function advection_parallel!(q, u)
           for t = 1:size(q,3)-1
               @sync @distributed for j = 1:size(q,2)
                   for i = 1:size(q,1)
                       q[i,j,t+1]= q[i,j,t] + u[i,j,t]
                   end
               end
           end
           q
       end;

将工作分成几个部分:

julia> function advection_shared!(q, u)
           @sync begin
               for p in procs(q)
                   @async remotecall_wait(advection_shared_chunk!, p, q, u)
               end
           end
           q
       end;

如果我们创建’SharedArray’数组并测量这些函数的运行时间,我们将得到以下结果(当使用`julia-p4`命令时)。

julia> q = SharedArray{Float64,3}((500,500,500));

julia> u = SharedArray{Float64,3}((500,500,500));

让我们执行一次函数来执行JIT编译,然后应用宏 `@time'第二次执行。

julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
 830.220 milliseconds (216 allocations: 13820 bytes)

julia> @time advection_parallel!(q, u);
   2.495 seconds      (3999 k allocations: 289 MB, 2.09% gc time)

julia> @time advection_shared!(q,u);
        From worker 2:       (irange,jrange,trange) = (1:500,1:125,1:499)
        From worker 4:       (irange,jrange,trange) = (1:500,251:375,1:499)
        From worker 3:       (irange,jrange,trange) = (1:500,126:250,1:499)
        From worker 5:       (irange,jrange,trange) = (1:500,376:500,1:499)
 238.119 milliseconds (2264 allocations: 169 KB)

"平流"的主要优势! 原因是工作流之间的流量被最小化,这样每个过程都可以长时间与分配的部件一起工作。

共享数组和分布式垃圾回收

就像已删除的引用一样,共享数组依赖于创建它们的节点上的垃圾收集器来清理所有参与工作流中的引用。 在创建许多短期共享数组的代码中,建议尽快显式消除这些对象。 这将使得清除与共享段相关联的存储器和文件描述符的速度更快。

ClusterManager对象

Julia进程使用集群管理器启动、管理并组合到一个逻辑集群中。 ClusterManager负责以下操作:

  • 在集群环境中运行工作流;

  • 每个工作流生命周期内的事件管理;

  • 必要时确保数据传输。

Julia中的集群具有以下特性。

  • 最初的Julia进程,也称为master,是特殊的,'id’为1。

  • 只有"主"进程才能添加或删除工作进程。

  • 所有进程都可以直接相互交互。

工作流之间的连接(使用内置的TCP/IP传输)建立如下。

количество рабочих процессов на соответствующих компьютерах.
  • 每个工作进程开始侦听空闲端口,并将有关其主机和端口的信息写入流。 'stdout'

  • 群集管理器捕获流 'stdout',并使其可用于主进程。

  • 主进程分析此信息并建立到每个工作流的TCP/IP连接。

  • 每个工作流还会收到有关群集中其他工作流的通知。

  • 每个工作流连接到"id"小于此工作流的"id"的所有工作流。

  • 这将创建一个多连接网络,其中每个工作流都直接连接到所有其他工作流。

虽然默认情况下,传输层使用常规套接字。 `TCPSocket',Julia集群可以提供自己的传输。

Julia有两个内置的集群管理器。:

"LocalManager"用于在同一主机上运行其他工作流,允许使用多核和多处理器硬件。

因此,群集管理器必须至少满足以下要求
  • 必须是抽象类型’ClusterManager`的子类型;

  • 必须实现该方法 'launch',负责启动新的工作流程;

  • 必须实现该方法 'manage',其响应于工作流存在期间的各种事件(例如,发送中断信号)而被调用。

'addprocs(manager::FooManager)`需要在`FooManager`中实现以下方法。

function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

例如,让我们看看"LocalManager"管理器是如何实现的,它负责在同一主机上运行工作流。

struct LocalManager <: ClusterManager
    np::Integer
end

function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

方法 `launch'接受以下参数:

  • 'manager::ClusterManager`:正在调用的集群管理器 'addprocs';

  • 'params::Dict`:所有命名参数都通过 'addprocs';

  • 'launched::Array':向其中添加一个或多个`WorkerConfig`对象的数组;

  • `c::Condition':用于通知工作流启动的条件变量。

方法 `launch'在单独的任务中异步调用。 此任务的完成表明所有请求的工作流正在运行。 因此,退出函数 'launch'

正在运行的工作流根据"全部到所有"方案相互连接并连接到主进程。 如果指定命令行参数'--worker[=<cookie>]`,正在运行的进程将自己初始化为worker,并通过TCP/IP套接字建立连接。

集群中的所有工作流都使用相同的 cookie文件,这是主要的过程。 如果未指定cookie(使用'--worker`选项时),工作流会尝试从标准输入流中读取它。 'LocalManager’和’SSHManager’通过其标准输入流将cookie传递给新的工作流。

默认情况下,工作流侦听作为调用结果返回的地址处的空闲端口。 'getipaddr()。 使用可选的--bind-to bind_addr[:port]'参数,可以指定特定的监听地址。 如果主机连接到多条物理数据线,这会很有用。

实现中的非TCP/IP传输的一个示例是MPI。 在这种情况下,不能指定'--worker’参数。 相反,新的工作进程应该在使用任何并行构造之前调用`init_worker(cookie)`。

对于每个正在运行的工作流,方法 `launch'必须将`WorkerConfig`对象添加到`launched'(初始化相应的字段)。

mutable struct WorkerConfig
    # Поля, общие для всех диспетчеров кластеров
    io::Union{IO, Nothing}
    host::Union{AbstractString, Nothing}
    port::Union{Integer, Nothing}

    # Используется при запуске дополнительных рабочих процессов на хосте
    count::Union{Int, Symbol, Nothing}
    exename::Union{AbstractString, Cmd, Nothing}
    exeflags::Union{Cmd, Nothing}

    # Внешние диспетчеры кластеров могут использовать этот объект для хранения информации на уровне отдельного рабочего процесса
    # Для хранения нескольких полей можно использовать словарь.
    userdata::Any

    # Подключения к рабочим процессам через SSHManager или туннель SSH
    tunnel::Union{Bool, Nothing}
    bind_addr::Union{AbstractString, Nothing}
    sshflags::Union{Cmd, Nothing}
    max_parallel::Union{Integer, Nothing}

    # Используется локальными диспетчерами или диспетчерами SSH
    connect_at::Any

    [...]
end

'WorkerConfig’中的大多数字段都由内置调度程序使用。 用户集群管理器通常只指定"io"或"主机"和"端口"。

  • 如果设置了"io"字段,则会从中读取有关主机和端口的信息。 Julia工作流在启动时输出其绑定地址和端口。 这允许Julia工作流侦听任何可用端口,而不是手动配置工作流的端口。

  • 如果未指定’io’字段,则使用’host`和’port’字段进行连接。

  • 需要字段’count','exename’和’exeflags’才能从此工作流运行其他工作流。 例如,群集管理器可以在节点上启动一个工作流,并使用它运行其他工作流。 如果’count’字段的整数值为’n`,则总共启动`n’个工作进程。 如果’count’字段具有值`:auto',则正在运行的工作流数等于计算机上的CPU线程(逻辑内核)数。 'Exename’字段包含’julia’可执行文件的名称,包括完整路径。 "Exeflags"字段指定应使用其启动工作流的命令行参数。

  • 当需要SSH隧道从主隧道连接到工作流时,将使用字段"隧道"、"bind_addr"、"sshflags"和"max_parallel"。

  • 为用户集群管理器提供了"userdata"来存储他们自己的工作流信息。

'Manage(manager’method::FooManager,id::Integer,config::WorkerConfig,op::Symbol)'在工作流存在期间的不同点调用,其对应值为’op`:

  • 在Julia工作流池中添加或删除进程时,使用值`:register`或`:deregister`。

  • 当调用"中断(工人)"时,值为":中断"。 ClusterManager调度器必须向相应的工作流发送中断信号。;

  • 值为":finalize"用于清洁目的。

具有自定义传输的集群管理器

使用自定义传输层替换通过TCP/IP套接字的默认全对全连接需要更多的努力。 Julia进程的交互任务数等于它所连接的工作流数。 例如,考虑一个Julia集群,在一个全对全多连接网络中有32个进程。

  • 因此,每个Julia进程都有31个交互任务。

  • 每个任务通过消息处理循环处理来自单个远程工作流的所有传入消息。

  • 消息处理循环正在等待’IO’对象(例如, `TCPSocket'在默认实现中),读取整个消息,处理它,并等待下一个消息。

  • 消息直接从任何Julia任务发送到进程-不仅仅是从交互任务-再次通过相应的’IO`对象。

新的实现取代了默认的传输,应该建立与远程工作流的连接,并提供消息处理周期可以期望的适当的"IO"对象。 实现需要以下与dispatcher相关的回调。

connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)

默认实现(使用TCP/IP套接字)看起来像’connect(manager::ClusterManager,pid::Integer,config::WorkerConfig)`。

'Connect’方法应该返回一对’IO`对象:其中一个旨在读取从`pid`工作流发送的数据,另一个旨在写入发送到`pid’工作流的数据。 用户集群管理器可以使用内存中的BufferStream对象作为用户传输之间数据传输的中介,可能没有IO对象,以及用于并行处理的内置Julia基础结构。

'BufferStream’是放置在内存中的对象。 'IOBuffer',它作为`IO’工作:它是一个可以异步处理的流。

在文件夹’clustermanager/0mq’中https://github.com/JuliaAttic/Examples [Examples repositories]有一个使用ZeroMQ将Julia使用zvezda拓扑与中心的0mq broker连接的示例。 注意。 所有Julia进程仍然在逻辑上相互连接-其中任何一个都可以直接向任何其他进程发送消息,而不考虑到0MQ用作传输层。

使用自定义传输时,请考虑以下功能。

  • Julia工作流不应使用`--worker`参数启动。 当使用'--worker`参数启动时,新的工作进程默认使用基于TCP/IP套接字的传输。

  • 对于工作流的每个传入逻辑连接,有必要调用’Base。process_messages(rd::IO,wr::IO)()'。 因此,将启动一个新任务,负责读取和写入发送到工作流和从工作流发送并由"IO"对象表示的消息。

  • 作为工作流初始化的一部分,不需要调用’init_worker(cookie,manager::FooManager)'。

  • 'WorkerConfig’中的`connect_at::Any’字段可以由集群管理器在调用时设置 '发射'。 此字段的值在所有回调中传递。 '连接'。 通常,它包含有关如何连接到工作流的信息。 例如,基于TCP/IP套接字的传输使用此字段指定元组`(主机,端口)`,即连接到工作流的主机和端口。

要从集群中删除工作流,调用’kill(manager,pid,config)`方法。 为了正确清洁,必须在主进程中关闭相应的"IO"对象。 默认实现只是对指定的远程工作流执行`exit()`调用。

Examples存储库中的’clustermanager/simple’文件夹中有一个示例,演示了使用UNIX域套接字配置集群的简单实现。

LocalManager和SSHManager的网络要求

Julia集群的设计是为了在已经安全的环境中工作,这些环境基于基础设施,如本地计算机、部门集群甚至云。 本节专门讨论内置调度程序"LocalManager"和"SSHManager"的网络安全要求。

  • 主进程不侦听任何端口。 它只是连接到工作流。

  • 每个工作流仅绑定到一个本地接口,并侦听一个临时端口,该端口的数量由操作系统分配。

  • Addprocs(N)'函数使用的`LocalManager’管理器仅在默认情况下绑定到loopback接口。 这意味着稍后在远程主机上启动的工作进程(可能由攻击者)无法连接到群集。 如果在`addprocs(4)`之后调用`addprocs(["remote_host"]),它将失败。 有时用户可能需要创建一个由其本地系统和多个远程系统组成的集群。 为此,您可以使用命名参数"restriction":"addprocs(4;restriction=false)"显式请求将"LocalManager"绑定到外部网络接口。

  • `SSHManager`管理器由`addprocs(list_of_remote_hosts)'函数使用,通过SSH协议在远程主机上运行工作流。 默认情况下,SSH协议仅用于运行Julia工作流。 对于工作进程之间以及它们与主进程之间的进一步通信,使用没有加密的常规TCP/IP套接字。 必须在远程主机上启用无密码登录。 可以使用命名的"sshflags"参数指定其他SSH标志或凭据。

  • 如果SSH连接也需要用于主进程和工作进程之间的交互,则调用`addprocs(list_of_remote_hosts;tunnel=true,sshflags=<ssh keys and other flags>)非常有用。 一个典型的例子是Julia REPL环境(即主进程)在本地计算机上运行,而集群的其余部分在Amazon EC2等云中运行。 在这种情况下,只需要在远程集群中打开端口22,SSH客户端使用公钥基础结构(Pki)进行身份验证。 凭据可以使用`sshflags’提供,例如$sshflags=-i<keyfile>`$。

    在全对全拓扑(默认情况下)中,所有工作进程通过常规TCP套接字相互连接。 因此,群集节点上的安全策略应确保可以使用一系列临时端口(取决于操作系统)在工作流之间自由连接。

    工作流之间的所有流量的保护和加密(通过SSH)或单个消息的加密可以由用户调度程序’ClusterManager’进行。

  • 如果函数`multiplex=true’与参数一起调用 addprocs',SSH复用在主进程和工作进程之间创建隧道。 如果您自己配置了SSH多路复用并且连接已经建立,则无论`multiplex`参数的值如何,都会使用SSH多路复用。 如果启用了多路复用,则使用现有连接配置转发(SSH中的-O forward’选项)。 如果您的服务器需要基于密码的身份验证,这可能很有用:可以通过在调用之前登录到服务器来避免Julia身份验证 'addprocs'。 控制套接字将位于'~/。ssh/julia-%r@%h:%p’在会话期间,除非使用现有的多路复用连接。 请注意:如果在一个节点上创建多个进程并启用多路复用,则带宽可能会受到限制,因为在这种情况下,进程与多路复用共享单个TCP连接。

集群cookie

集群中的所有工作进程都使用相同的cookie,默认情况下是主进程中随机生成的字符串。

его и возвращает новый файл cookie.
  • 所有连接都在双方进行身份验证,以便只有主进程启动的工作进程才能相互连接。

  • 当工作流使用`--worker=<cookie>`参数启动时,cookie可以传递给工作流。 如果在不指定cookie的情况下使用`--worker’参数,则工作流会尝试从标准输入流中读取cookie('stdin')。 收到cookie后,`stdin’流立即关闭。

  • ClusterManager可以通过调用方法从主进程接收cookie文件 'cluster_cookie()。 默认情况下不使用TCP/IP传输(因此不指定--worker`参数)的集群管理器应使用主进程的cookie文件调用`init_worker(cookie,manager)`。

请注意,在安全性要求较高的环境中,可能会使用`ClusterManager’的自定义实现。 例如,cookie可能会提前提供,因此不会在启动时指定为参数。

指定网络拓扑(实验功能)

传递给函数的命名参数"拓扑" `addprocs',定义工作流应如何相互连接:

  • ':all_to_all'(默认值)--所有工作进程相互连接;

  • ':master_worker'--只有主进程,即pid为1的进程连接到工作进程;

  • :custom--集群管理器的`launch’方法使用`WorkerConfig`中的`ident`和`connect_idents’字段设置连接拓扑。 集群管理器提供的标识符为"ident"的工作进程连接到"connect_idents"中指定的所有工作进程。

命名参数’lazy=true/false`只有当`topology`参数具有值`:all_to_all’时才有效。 如果该值设置为"true",则在群集启动后,主进程将立即连接到所有工作程序。 特定工作流之间的连接是在它们之间的第一次远程调用上建立的。 这有助于减少最初为群集内的交互分配的资源量。 连接的建立取决于并行程序在运行时的需要。 "Lazy"参数的默认值是"true"。

目前,在未连接的工作进程之间发送消息会导致错误。 此功能及其相关接口应被视为实验性质,并可能在未来版本中更改。

重要的外部软件包

除了在Julia中实现并行之外,还有很多外部包值得一提。 例如,https://github.com/JuliaParallel/MPI.jl ['MPI。jl']是MPI协议的Julia包装器。 https://github.com/JuliaParallel/Dagger.jl ['匕首。jl']提供类似库的功能https://dask.org /[Dask]在Python中,和https://github.com/JuliaParallel/Distributedarrays.jl ['DistributedArrays.jl']支持多个工作流中的分布式阵列操作,如已经描述的

还有必要提到Julia GPU编程生态系统,其中包括以下组件。

  1. cuda.jl包括各种CUDA库,并支持为Nvidia Gpu编译Julia内核。

  2. oneAPI.jl作为统一oneAPI编程模型的包装器,支持在支持的加速器上执行Julia内核。 目前,仅支持Linux。

  3. AMDGPU.jl包括AMD ROCm库,并支持为AMD Gpu编译Julia内核。 目前,仅支持Linux。

  4. 高级库,如https://github.com/JuliaGPU/KernelAbstractions.jl [KernelAbstractions.jl],https://github.com/mcabbott/Tullio.jl [Tullio.jl]和https://github.com/JuliaComputing/ArrayFire.jl [ArrayFire。jl]。

在下面的例子中,我们使用’DistributedArrays。jl’和’CUDA.jl’来跨多个进程分布阵列。 要做到这一点,我们将首先使用`distribute()`和`CuArray()'提出它。

别忘了"分发"。jl’必须导入到所有进程中,使用 '@everywhere'

$ ./julia -p 4

julia> addprocs()

julia> @everywhere using DistributedArrays

julia> using CUDA

julia> B = ones(10_000) ./ 2;

julia> A = ones(10_000) .* π;

julia> C = 2 .* A ./ B;

julia> all(C .≈ 4*π)
true

julia> typeof(C)
Array{Float64,1}

julia> dB = distribute(B);

julia> dA = distribute(A);

julia> dC = 2 .* dA ./ dB;

朱莉娅>所有(dC。≈ 4*π)
真的

朱莉娅>打字(dC)
分发的。阵列;阵列{Float64,1,Array{Float64,1}}

朱莉娅>幼崽=CuArray(B);

julia>cuA=CuArray(A);

朱莉娅>cuC=2。*cuA。/幼崽;

朱莉娅>所有(cuC。≈ 4*π);
真的

朱莉娅>typeof(cuC)
[医]阵列{Float64,1}

在下面的例子中,我们使用’DistributedArrays.jl’和’CUDA.jl’跨多个进程分发数组并为其调用通用函数。

function power_method(M, v)
    for i in 1:100
        v = M*v
        v /= norm(v)
    end

    return v, norm(M*v) / norm(v)  # или (M*v) ./ v
end

'power_method’系统地创建向量并标准化它们。 我们没有在函数声明中指定类型签名。 让我们看看这是否适用于前面提到的数据类型。

julia> M = [2. 1; 1 1];

julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877

julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)

julia> cuM = CuArray(M);

julia> cuv = CuArray(v);

julia>cuesult=power_method(暨,cuv);

朱莉娅>typeof(curesult)
[医]阵列{Float64,1}

julia>dM=分布(M);

julia>dv=分布(v);

julia>dC=power_method(dM,dv);

朱莉娅>打字(dC)
元组{DistributedArrays.DArray{Float64,1,Array{Float64,1}},浮64}

要结束这个外部包的简要概述,请考虑"MPI"。JL`,Mpi协议的Julia包装器。 审查所有内部职能所需的时间太长,因此最好是简单地评估议定书的执行方法。

让我们考虑一个简单的脚本,它调用每个子进程,创建其rank的实例,并在到达主进程时汇总rank。

import MPI

MPI.Init()

comm = MPI.COMM_WORLD
MPI.Barrier(comm)

root = 0
r = MPI.Comm_rank(comm)

sr = MPI.Reduce(r, MPI.SUM, root, comm)

if(MPI.Comm_rank(comm) == root)
   @printf("sum of ranks: %s\n", sr)
end

MPI.Finalize()
mpirun -np 4 ./julia example.jl