Многопроцессорная обработка и распределенные вычисления

Поддержка параллельных вычислений в системах с распределенной памятью реализована в модуле Distributed, который входит в состав стандартной библиотеки Julia.

У большинства современных компьютеров несколько ЦП, а сами компьютеры можно объединять в кластеры. Использование возможностей нескольких ЦП позволяет ускорить многие вычисления. На производительность влияют два основных фактора: быстродействие самих ЦП и скорость их доступа к памяти. Вполне очевидно, что в кластере отдельно взятый ЦП сможет быстрее обращаться к ОЗУ своего компьютера (узла). Более удивительно то, что аналогичная ситуация прослеживается на любом многоядерном компьютере из-за различий в быстродействии основной памяти и кеша. Поэтому в грамотно построенной многопроцессорной среде должна контролироваться привязка того или иного блока памяти к определенному ЦП. Julia обеспечивает многопроцессорную среду на основе передачи сообщений: программы могут выполняться одновременно как несколько процессов в отдельных областях памяти.

Передача сообщений реализована в Julia не так, как в других средах, таких как MPI [1]. Взаимодействие в Julia обычно одностороннее. Это значит, что в случае с двухпроцессной операцией программисту достаточно управлять явным образом только одним процессом. Более того, взаимодействие обычно происходит не просто в виде отправки и получения сообщений, а больше напоминает высокоуровневые операции, такие как вызовы пользовательских функций.

Распределенное программирование в Julia основано на двух примитивах: удаленных ссылках и удаленных вызовах. Удаленная ссылка — это объект, с помощью которого можно обратиться из любого процесса к объекту, хранящемуся в определенном другом процессе. Удаленный вызов — это выполняемый одним процессом запрос на вызов некоторой функции с определенными аргументами в другом (или том же самом) процессе.

Удаленные ссылки могут быть двух видов: метода Future и макроса RemoteChannel.

Удаленный вызов возвращает ссылку Future на результат выполнения. Удаленный вызов возвращает управление немедленно: совершивший его процесс переходит к следующей операции, пока сам вызов выполняется где-то еще. Чтобы дождаться завершения удаленного вызова, вызовите функцию wait для возвращенного объекта Future, а чтобы получить полное результирующее значение, используйте метод fetch.

В свою очередь, объекты RemoteChannel поддерживают перезапись. Например, несколько процессов могут координировать обработку, ссылаясь на один и тот же удаленный канал (Channel).

С каждым процессом связан идентификатор. У процесса, предоставляющего интерактивную командную строку Julia, id всегда равен 1. Процессы, используемые по умолчанию для параллельных операций, называются рабочими (worker). Когда процесс только один, рабочим считается процесс 1. В противном случае рабочими считаются все процессы, кроме процесса 1. Поэтому для получения преимуществ от методов параллельной обработки, таких как pmap, необходимо использовать два процесса или более. Например, добавление одного дополнительного процесса может быть полезно, если вы хотите, чтобы главный процесс продолжал выполнять свои операции, пока рабочий занят длительными вычислениями.

Давайте попробуем разобраться на практике. При использовании команды julia -p n на локальном компьютере запускается n рабочих процессов. Как правило, имеет смысл делать n равным количеству потоков ЦП (логических ядер) на компьютере. Обратите внимание, что при указании аргумента -p автоматически загружается модуль Distributed.

$ julia -p 2

julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)

julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.18526  1.50912
 1.16296  1.60607

Первый аргумент метода remotecall — вызываемая функция. При параллельном программировании в Julia обычно не указываются конкретные процессы или количество доступных процессов, однако remotecall считается низкоуровневым интерфейсом, обеспечивающим более детальный контроль. Второй аргумент remotecall — это id процесса, который будет выполнять работу. Остальные аргументы будут переданы в вызываемую функцию.

Таким образом, в первой строке процессу 2 был отправлен запрос на создание матрицы размером 2 на 2, а во второй строке — запрос на прибавление значения 1 к нему. Результат обоих вычислений доступен в двух объектах Future: r и s. Макрос @spawnat вычисляет выражение, указанное во втором аргументе, в процессе, указанном в первом аргументе.

Иногда может потребоваться немедленно получить значение, вычисленное удаленно. Обычно такое бывает, когда вы получаете из удаленного объекта данные, необходимые для следующей локальной операции. С этой целью существует функция remotecall_fetch. Она эквивалентна функции fetch(remotecall(...)), но более эффективна.

julia> remotecall_fetch(r-> fetch(r)[1, 1], 2, r)
0.18526337335308085

Этот код возвращает первое значение из массива в рабочем процессе 2. Обратите внимание, что метод fetch в данном случае не перемещает данные, так как выполняется в рабочем процессе, которому принадлежит массив. Это также можно было бы записать следующим образом.

julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.10824216411304866

Как вы помните, выражение getindex(r,1,1) равносильно r[1,1], поэтому данный вызов получает первый элемент объекта Future r.

Чтобы упростить себе задачу, можно передать в макрос @spawnat символ :any для автоматического выбора рабочего процесса.

julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)

julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.38854  1.9098
 1.20939  1.57158

Обратите внимание, что мы использовали 1 .+ fetch(r) вместо 1 .+ r. Причина в том, что мы не знаем, где будет выполняться код, поэтому в общем случае для перемещения r в процесс, в котором выполняется сложение, требуется fetch. В данном случае макрос @spawnat знает, что вычисление нужно выполнить в процессе, которому принадлежит r, поэтому fetch будет холостой операцией (работа не производится).

(Стоит отметить, что @spawnat — это не встроенная функция, а макрос, определенный в Julia. Вы можете определять собственные подобные конструкции.)

Важно помнить, что после получения объект Future кеширует свое значение локально. Дальнейшие вызовы fetch не приводят к сетевому прыжку. После получения всех ссылок Future значение, хранящееся удаленно, удаляется.

Макрос @async схож с макросом @spawnat, но выполняет задачи только в локальном процессе. Он служит для создания задачи-поставщика (feeder) для каждого процесса. Каждая задача получает следующий индекс, который необходимо вычислить, дожидается завершения процесса, а затем повторяет выполнение, пока не закончатся индексы. Обратите внимание, что задачи-поставщики не начинают выполняться, пока главная задача не достигнет конца блока @sync. В этот момент она отдает управление и дожидается завершения выполнения всех локальных задач, после чего функция возвращает управление. Начиная с версии 0.7, задачи-поставщики могут сообщать о своем состоянии посредством nextidx, поскольку все они выполняются в одном процессе. Даже если задачи (Tasks) планируются совместно, в некоторых ситуациях, например при асинхронном вводе-выводе, может требоваться блокировка. Это означает, что переключения контекста происходят только в определенные моменты, в данном случае при вызове remotecall_fetch. Это текущее состояние реализации, которое может измениться в будущих версиях Julia. Его цель — обеспечить выполнение до N задач (Tasks) в числе M процессов (Process), то есть модель многопоточности M:N. Для nextidx потребуется модель установки и снятия блокировки, так как одновременное чтение из ресурса и запись в ресурс несколькими процессами представляет опасность.

Доступность кода и загрузка пакетов

Ваш код должен быть доступен любому процессу, в котором он выполняется. Например, введите в командной строке Julia следующее.

julia> function rand2(dims...)
           return 2*rand(dims...)
       end

julia> rand2(2,2)
2×2 Array{Float64,2}:
 0.153756  0.368514
 1.15119   0.918912

julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))
Stacktrace:
[...]

Процессу 1 функция rand2 известна, а процессу 2 нет.

Обычно код загружается из файлов или пакетов, при этом можно весьма гибко контролировать то, в какие процессы загружается код. Возьмем файл DummyModule.jl, содержащий следующий код.

module DummyModule

export MyType, f

mutable struct MyType
    a::Int
end

f(x) = x^2+1

println("loaded")

end

Чтобы во всех процессах можно было ссылаться на MyType, файл DummyModule.jl необходимо загрузить в каждый процесс. При вызове include("DummyModule.jl") он загружается только в один процесс. Чтобы загрузить файл в каждый процесс, используйте макрос @everywhere (Julia следует запустить командой julia -p 2).

julia> @everywhere include("DummyModule.jl")
loaded
      From worker 3:    loaded
      From worker 2:    loaded

Как обычно, модуль DummyModule при этом не включается в область процесса. Для этого требуется использовать ключевое слово using или import. Более того, когда DummyModule вводится в область в одном процессе, в других этого не происходит:

julia> using .DummyModule

julia> MyType(7)
MyType(7)

julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: `MyType` not defined
⋮

julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)

Однако вы все же можете передать MyType в процесс, в котором модуль DummyModule загружен, но не включен в область.

julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)

Файл можно также предварительно загрузить в несколько процессов при запуске с помощью флага -L, а для активации его вычисления можно использовать скрипт активации.

julia -p <n> -L file1.jl -L file2.jl driver.jl

В примере выше процесс Julia, выполняющий скрипт активации, имеет id со значением 1, так же как и процесс, предоставляющий интерактивную командную строку.

Наконец, если DummyModule.jl — это не отдельный файл, а пакет, оператор using DummyModule загружает DummyModule.jl во все процессы, но включает в область только в том процессе, где был вызван оператор using.

Запуск рабочих процессов и управление ими

В базовой установке Julia поддерживаются два типа кластеров:

  • локальный кластер, создаваемый с помощью параметра -p, как показано выше;

  • кластер, охватывающий несколько компьютеров; создается с помощью параметра --machine-file. Для запуска рабочих процессов Julia (из пути текущего хоста) на указанных компьютерах при этом используется вход по ssh без пароля. Каждый компьютер указывается в следующей форме: [count*][user@]host[:port] [bind_addr[:port]]. По умолчанию user — это текущий пользователь, а port — стандартный SSH-порт. count — это количество рабочих процессов, порождаемых на узле (по умолчанию 1). Необязательный элемент bind-to bind_addr[:port] определяет IP-адрес и порт, которые другие рабочие процессы должны использовать для подключения к данному процессу.

Хотя в Julia, как правило, обеспечивается обратная совместимость, распределение кода по рабочим процессам осуществляется с помощью Serialization.serialize. Как указано в соответствующей документации, данный механизм может не работать при использовании разных версий Julia, поэтому лучше, чтобы все рабочие процессы на всех компьютерах использовали одну и ту же версию.

Для программного добавления, удаления и запроса процессов в кластере доступны функции addprocs, rmprocs, workers и другие.

julia> using Distributed

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

Перед вызовом addprocs модуль Distributed должен быть явным образом загружен в главный процесс. В рабочих процессах он становится доступен автоматически.

Обратите внимание, что рабочие процессы не выполняют скрипт запуска ~/.julia/config/startup.jl и не синхронизируют свое глобальное состояние (включая глобальные переменные, новые определения методов и загруженные модули) с другими запущенными процессами. Вы можете использовать addprocs(exeflags="--project"), чтобы инициализировать рабочий процесс с определенной средой, а затем выполнить команду @everywhere using <modulename> или @everywhere include("file.jl").

Для поддержки кластеров других типов можно написать пользовательскую реализацию ClusterManager, как описано ниже в разделе Объекты ClusterManager.

Перемещение данных

В распределенной программе основная часть работы приходится на отправку сообщений и перемещение данных. Для обеспечения высокой производительности и масштабируемости важно свести к минимуму количество сообщений и объем передаваемых данных. Для этого необходимо понимать, как перемещают данные различные конструкции распределенного программирования в Julia.

fetch можно расценивать как операцию явного перемещения данных, так как перемещение объекта на локальный компьютер запрашивается напрямую. Макрос @spawnat (и ряд связанных конструкций) также перемещает данные, но уже не таким очевидным образом, поэтому это можно назвать операцией неявного перемещения данных. Рассмотрим два подхода к созданию случайной матрицы и ее возведению в квадрат.

Способ 1

julia> A = rand(1000,1000);

julia> Bref = @spawnat :any A^2;

[...]

julia> fetch(Bref);

Способ 2

julia> Bref = @spawnat :any rand(1000,1000)^2;

[...]

julia> fetch(Bref);

Различие кажется банальным, но на самом деле имеет существенное значение из-за поведения макроса @spawnat. При первом способе случайная матрица создается локально, а затем передается в другой процесс, где возводится в квадрат. При втором способе случайная матрица и создается, и возводится в квадрат в другом процессе. Поэтому во втором случае передается гораздо меньше данных, чем в первом.

В этом простейшем примере различие между двумя способами очевидно, так что подходящий выбрать легко. Однако в реальной программе может потребоваться тщательно продумать перемещение данных и, возможно, провести измерения. Например, если матрица A нужна первому процессу, возможно, лучше использовать первый способ. Если же для вычисления матрицы A требуется множество ресурсов и они есть только у текущего процесса, то ее неизбежно придется переместить в другой процесс. Наконец, если текущему процессу практически нечем заняться между вызовами @spawnat и fetch(Bref), возможно, будет лучше вовсе отказаться от параллелизма. Представим также, что вместо rand(1000,1000) у нас более сложная операция. В этом случае, возможно, имеет смысл добавить еще один оператор @spawnat исключительно для нее.

Глобальные переменные

Выражения, выполняемые удаленно посредством @spawnat, или замыкания, назначенные к удаленному выполнению с помощью remotecall, могут ссылаться на глобальные переменные. Глобальные привязки (binding) в модуле Main обрабатываются немного иначе, чем глобальные привязки в других модулях. Рассмотрим следующий фрагмент кода.

A = rand(10,10)
remotecall_fetch(()->sum(A), 2)

В этом случае функция sum ДОЛЖНА быть определена в удаленном процессе. Обратите внимание, что A — это глобальная переменная, определенная в локальной рабочей области. У рабочего процесса 2 нет переменной с именем A в Main. Отправка замыкания ()->sum(A) в рабочий процесс 2 приводит к тому, что в нем определяется Main.A. Main.A продолжает существовать в рабочем процессе 2 даже после возврата управления вызовом remotecall_fetch. Удаленные вызовы с внедренными глобальными ссылками (только в модуле Main) управляют глобальными переменными следующим образом.

  • Новые глобальные переменные создаются в целевых рабочих процессах, если к ним происходит обращение в рамках удаленного вызова.

  • Глобальные константы также объявляются как константы на удаленных узлах.

  • Глобальные переменные передаются повторно в целевой рабочий процесс только в контексте удаленного вызова и только в случае изменения значения. Кроме того, глобальные привязки не синхронизируются между узлами кластера. Например:

    A = rand(10,10)
    remotecall_fetch(()->sum(A), 2) # рабочий процесс 2
    A = rand(10,10)
    remotecall_fetch(()->sum(A), 3) # рабочий процесс 3
    A = nothing

    В результате приведенного выше фрагмента кода переменные Main.A в рабочем процессе 2 и Main.A в рабочем процессе 3 будут иметь разные значения, а Main.A на узле 1 принимает значение nothing.

Как вы, возможно, уже догадываетесь, память, связанная с глобальными переменными, может очищаться при их переприсваивании в главном процессе, но в рабочих процессах этого не происходит, так как привязки продолжают действовать. Если определенные глобальные переменные в удаленных узлах больше не нужны, им можно присвоить значение nothing с помощью метода clear!. В результате связанная с ними память высвобождается при выполнении очередного цикла сборки мусора.

Поэтому ссылаться на глобальные переменные в удаленных вызовах следует с осторожностью. А еще лучше по возможности вообще избегать этого. Если ссылаться на глобальные переменные все же необходимо, для их локализации рекомендуется использовать блоки let.

Пример:

julia> A = rand(10,10);

julia> remotecall_fetch(()->A, 2);

julia> B = rand(10,10);

julia> let B = B
           remotecall_fetch(()->B, 2)
       end;

julia> @fetchfrom 2 InteractiveUtils.varinfo()
name           size summary
––––––––– ––––––––– ––––––––––––––––––––––
A         800 bytes 10×10 Array{Float64,2}
Base                Module
Core                Module
Main                Module

Как видите, глобальная переменная A определена в рабочем процессе 2, но B захватывается как локальная переменная, и поэтому в рабочем процессе 2 привязки для B нет.

Параллельное сопоставление и циклы

К счастью, для многих полезных параллельных вычислений перемещать данные не требуется. Обычный пример — моделирование методом Монте-Карло, для которого несколько процессов могут одновременно проводить независимые модельные испытания. Мы можем использовать @spawnat для подбрасывания монет в двух процессах. Для начала напишем следующую функцию в файле count_heads.jl.

function count_heads(n)
    c::Int = 0
    for i = 1:n
        c += rand(Bool)
    end
    c
end

Функция count_heads просто складывает n случайных бит. Следующим образом можно провести испытания на двух компьютерах, а затем сложить результаты.

julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")

julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)

julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)

julia> fetch(a)+fetch(b)
100001564

Данный пример демонстрирует распространенный и весьма эффективный шаблон параллельного программирования. Итерации выполняются независимо друг от друга в нескольких процессах, а затем их результаты объединяются с помощью какой-либо функции. Процесс объединения называется редукцией, так как обычно предполагает понижение валентности тензора: вектор чисел сводится к одному числу, либо матрица сводится к одной строке или столбцу и т. д. В коде это обычно выглядит как шаблон x = f(x,v[i]), где x — сумматор, f — функция редукции, а v[i] — редуцируемые элементы. Желательно, чтобы функция f была ассоциативной: так операции можно будет выполнять в любом порядке.

Обратите внимание, что наш вариант использования этого шаблона применительно к count_heads можно обобщить. Мы использовали два явных оператора @spawnat, поэтому параллелизм ограничен двумя процессами. Для запуска в любом количестве процессов можно использовать параллельный цикл for, выполняющийся в распределенной памяти. В Julia его можно записать с использованием @distributed следующим образом.

nheads = @distributed (+) for i = 1:200000000
    Int(rand(Bool))
end

Эта конструкция реализует шаблон назначения итераций нескольким процессам с последующим объединением их результатов с помощью указанной редукции (в данном случае (+)). Результатом каждой итерации является значение последнего выражения внутри цикла. В результате вычисления всего выражения параллельного цикла получается окончательный ответ.

Имейте в виду: хотя параллельные циклы for похожи на последовательные, их поведение в корне отличается. В частности, итерации происходят не в определенном порядке, а значения, присваиваемые переменным или массивам, не будут доступны глобально, поскольку итерации выполняются в разных процессах. Переменные, используемые внутри параллельного цикла, копируются и транслируются в каждый процесс.

Например, следующий код не будет работать надлежащим образом.

a = zeros(100000)
@distributed for i = 1:100000
    a[i] = i
end

Этот код не инициализирует все элементы a, так как в каждом процессе будет отдельная копия. Таких параллельных циклов for следует избегать. К счастью, данное ограничение можно обойти с помощью общих массивов.

using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end

Использование внешних переменных в параллельных циклах совершенно оправданно, если переменные предназначены только для чтения.

a = randn(1000)
@distributed (+) for i = 1:100000
    f(a[rand(1:end)])
end

Здесь в каждой итерации функция f применяется к случайно выбранному элементу вектора a, который используется совместно всеми процессами.

Как видите, оператор редукции можно опустить, если он не нужен. В этом случае цикл выполняется асинхронно, то есть порождает независимые задачи во всех доступных рабочих процессах и немедленно возвращает массив объектов Future, не дожидаясь завершения выполнения. Вызывающая сторона может дожидаться завершения выполнения объектов Future где-либо далее, вызывая для них fetch, или дожидаться завершения выполнения в конце цикла, для чего перед ним следует указать @sync, например @sync @distributed for.

В некоторых случаях оператор редукции не нужен, если мы просто хотим применить функцию ко всем целым числам в некотором диапазоне (или в более общем случае ко всем элементам некоторой коллекции). Это еще одна полезная операция, которая называется параллельным сопоставлением. Она реализована в Julia посредством функции pmap. Например, вычислить сингулярные значения для нескольких больших случайных матриц в параллельном режиме можно следующим образом.

julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];

julia> pmap(svdvals, M);

Функция pmap в Julia предназначена для случаев, когда каждый вызов функции выполняет большой объем работы. Напротив, @distributed for подходит для ситуаций, когда каждая итерация очень простая, вплоть до того, что просто складываются два числа. Для параллельных вычислений с помощью pmap и @distributed for используются только рабочие процессы. В случае с @distributed for окончательная редукция осуществляется в вызывающем процессе.

Удаленные ссылки и объекты AbstractChannel

Удаленные ссылки всегда связаны с реализацией AbstractChannel.

В конкретной реализации AbstractChannel (например, Channel) должны быть реализованы методы put!, take!, fetch, isready и wait. Удаленный объект, на который ссылается Future, хранится в Channel{Any}(1), то есть в канале Channel размера 1 для объектов типа Any.

Объект RemoteChannel, поддерживающий перезапись, может указывать на каналы любого типа и размера или любую другую реализацию AbstractChannel.

Конструктор RemoteChannel(f::Function, pid)() позволяет создавать ссылки на каналы, вмещающие более одного значения определенного типа. f — это функция, выполняемая в процессе pid. Она должна возвращать объект AbstractChannel.

Например, выражение RemoteChannel(()->Channel{Int}(10), pid) возвращает ссылку на канал типа Int размера 10. Канал существует в рабочем процессе pid.

Методы put!, take!, fetch, isready и wait объекта RemoteChannel проксируются во вспомогательное хранилище удаленного процесса.

Таким образом, RemoteChannel можно использовать для ссылки на объекты AbstractChannel, реализованные пользователем. В файле dictchannel.jl в репозитории Examples приводится простой пример, в котором в качестве удаленного хранилища используется словарь.

Каналы и объекты RemoteChannel

  • Объект Channel является локальным для процесса. Рабочий процесс 2 не может напрямую ссылаться на Channel в рабочем процессе 3 (и наоборот). Однако RemoteChannel может отправлять и принимать значения между рабочими процессами.

  • Объект RemoteChannel можно представить как дескриптор объекта Channel.

  • Идентификатор процесса pid, связанный с RemoteChannel, определяет процесс, в котором существует вспомогательное хранилище, то есть вспомогательный объект Channel.

  • Любой процесс со ссылкой на RemoteChannel может отправлять элементы в канал и принимать их из него. Данные автоматически отправляются в процесс (или извлекаются из процесса), с которым связан объект RemoteChannel.

  • При сериализации канала Channel также сериализуются все имеющиеся в нем данные. Поэтому при его десериализации создается копия исходного объекта.

  • Напротив, при сериализации объекта RemoteChannel сериализуется только идентификатор, определяющий расположение и экземпляр Channel, на который ссылается дескриптор. Поэтому десериализованный объект RemoteChannel (в любом рабочем процессе) указывает на то же вспомогательное хранилище, что и исходный.

Приведенный выше пример каналов можно изменить так, чтобы осуществлялось межпроцессное взаимодействие, как показано ниже.

Для обработки единственного удаленного канала jobs запускаются четыре рабочих процесса. В канал записываются задания, идентифицируемые по job_id. Каждая выполняемая удаленно задача в этом примере считывает job_id, ожидает случайное количество времени и записывает в канал results кортеж, состоящий из job_id, времени ожидания и собственного pid. Наконец, в главном процессе выводятся все данные из канала results.

julia> addprocs(4); # Добавляем рабочие процессы

julia> const jobs = RemoteChannel(()->Channel{Int}(32));

julia> const results = RemoteChannel(()->Channel{Tuple}(32));

julia> @everywhere function do_work(jobs, results) # Определяем рабочую функцию через everywhere
           while true
               job_id = take!(jobs)
               exec_time = rand()
               sleep(exec_time) # Имитирует время, затрачиваемое на реальную работу
               put!(results, (job_id, exec_time, myid()))
           end
       end

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> errormonitor(@async make_jobs(n)); # Подаем в канал jobs n заданий

julia> for p in workers() # Запускаем в рабочих процессах задачи для параллельной обработки запросов
           remote_do(do_work, p, jobs, results)
       end

julia> @elapsed while n > 0 # Выводим результаты
           job_id, exec_time, where = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
           global n = n - 1
       end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741

Удаленные ссылки и распределенная сборка мусора

Объекты, на которые указывают удаленные ссылки, можно очистить только после удаления всех связанных ссылок в кластере.

Узел, в котором хранится значение, отслеживает ссылающиеся на него рабочие процессы. Каждый раз, когда объект RemoteChannel или Future (получение которого отменено) сериализуется в рабочий процесс, узел, на который указывает ссылка, получает уведомление. Узел, которому принадлежит значение, также получает уведомление каждый раз, когда объект RemoteChannel или Future (получение которого отменено) очищается сборщиком мусора. Эти действия реализованы во внутреннем сериализаторе с поддержкой кластера. Удаленные ссылки действительны только в контексте выполнения кластера. Сериализация ссылок в обычные объекты IO и десериализация из них не поддерживаются.

Для уведомления отправляются сообщения об отслеживании: сообщение о добавлении ссылки, когда ссылка сериализуется в другой процесс, и сообщение об удалении ссылки, когда ссылка очищается локально сборщиком мусора.

Так как объекты Future записываются один раз и кешируются локально, при применении fetch к Future также обновляется информация об отслеживании ссылок на узле, которому принадлежит значение.

Узел, которому принадлежит значение, удаляет его после очистки всех ссылок на него.

В случае с объектами Future сериализация уже полученного Future в другой узел также сопровождается передачей значения, так как в исходном удаленном хранилище к этому времени значение уже может быть очищено сборщиком.

Важно отметить, что то, когда объект очищается сборщиком мусора локально, зависит от размера объекта и текущей доступности памяти в системе.

В случае с удаленными ссылками размер локального объекта ссылки совсем небольшой, в то время как значение, хранящееся в удаленном узле, может быть весьма большим. Так как локальный объект может не очищаться сборщиком немедленно, рекомендуется явным образом вызывать finalize для локальных экземпляров RemoteChannel или для объектов Future, получение которых отменено. Так как при вызове fetch для объекта Future также удаляется его ссылка из удаленного хранилища, делать это для полученных объектов Future не требуется. Явный вызов finalize приводит к немедленной отправке в удаленный узел сообщения о том, что нужно удалить ссылку на значение.

После ликвидации ссылка становится недействительной и ее нельзя использовать в последующих вызовах.

Локальные вызовы

Данные неизбежно копируются в удаленный вызов для выполнения. Это происходит и при удаленных вызовах, и когда данные сохраняются в RemoteChannel или Future в другом узле. Как и следовало ожидать, это приводит к образованию копий сериализованных объектов в удаленном узле. Однако, когда конечным узлом является локальный узел, то есть идентификатор вызывающего процесса совпадает с идентификатором удаленного узла, вызов выполняется как локальный. Обычно (но не всегда) он выполняется в другой задаче, но сериализации и десериализации данных не происходит. Следовательно, вызов ссылается на те же экземпляры объектов, которые передаются, — копии не создаются. Это поведение демонстрируется ниже.

julia> using Distributed;

julia> rc = RemoteChannel(()->Channel(3));   # RemoteChannel создается в локальном узле

julia> v = [0];

julia> for i in 1:3
           v[1] = i                          # Используем `v` повторно
           put!(rc, v)
       end;

julia> result = [take!(rc) for _ in 1:3];

julia> println(result);
Array{Int64,1}[[3], [3], [3]]

julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1

julia> addprocs(1);

julia> rc = RemoteChannel(()->Channel(3), workers()[1]);   # RemoteChannel создается в удаленном узле

julia> v = [0];

julia> for i in 1:3
           v[1] = i
           put!(rc, v)
       end;

julia> result = [take!(rc) for _ in 1:3];

julia> println(result);
Array{Int64,1}[[1], [2], [3]]

julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3

Как видно из примера, вызовы метода put! для локального канала RemoteChannel с изменением объекта v между вызовами приводят к тому, что сохраняется один и тот же экземпляр объекта. Когда же узел, которому принадлежит rc, другой, создаются копии v.

Следует отметить, что, как правило, проблемы в этом нет. Учитывать это необходимо только в том случае, если объект сохраняется локально и в то же время изменяется после вызова. В таких случаях, возможно, следует сохранять deepcopy объекта.

Это также верно для удаленных вызовов в локальном узле, как показано в следующем примере.

julia> using Distributed; addprocs(1);

julia> v = [0];

julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v);     # Выполняется в локальном узле

julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true

julia> v = [0];

julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # Выполняется в удаленном узле

julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false

Как видно и в этом случае, удаленный вызов к локальному узлу работает так же, как прямой вызов. Он изменяет локальные объекты, передаваемые в качестве аргументов. При удаленном вызове операции производятся с копией аргументов.

Еще раз повторим, что, как правило, проблемы в этом нет. Если локальный узел также применяется как вычислительный и аргументы используются после вызова, следует учитывать эту особенность и при необходимости передавать глубокие копии аргументов в вызов, совершаемый в локальном узле. Вызовы в удаленных узлах всегда работают с копиями аргументов.

Общие массивы

Общие массивы используют общую память системы для сопоставления одного и того же массива во множестве процессов. Хотя в чем-то объект SharedArray похож на DArray, его поведение во многом отличается. В случае с DArray каждый процесс имеет локальный доступ только к части данных, причем у всех процессов эти части разные. Напротив, в случае с объектом SharedArray каждый работающий с ним процесс имеет доступ ко всему массиву. SharedArray — это хороший выбор, когда большой объем данных должен быть доступен одновременно двум процессам или более на одном компьютере.

Поддержка общих массивов обеспечивается модулем SharedArrays, который необходимо явным образом загружать во все участвующие процессы.

SharedArray индексируется (с целью присваивания значений и доступа к ним) так же, как обычный массив, причем происходит это эффективно, поскольку соответствующая область памяти доступна локальному процессу. Поэтому большинство алгоритмов естественным образом работают с SharedArray, пусть и в однопроцессном режиме. Если алгоритм требует входного массива Array, базовый массив можно извлечь из SharedArray, вызвав функцию sdata. Для других типов AbstractArray функция sdata просто возвращает сам объект, поэтому sdata можно спокойно применять к любому объекту типа Array.

Конструктор общего массива имеет следующий вид.

SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])

Он создает N-мерный общий массив битового типа T и размера dims, доступный процессам, указанным в pids. В отличие от распределенных массивов, общий массив доступен только из участвующих процессов, указанных в именованном аргументе pids (а также из процесса, где он создается, если этот процесс выполняется на том же хосте). Обратите внимание, что в SharedArray поддерживаются только элементы типов isbits.

Если указана функция init с сигнатурой initfn(S::SharedArray), она вызывается во всех участвующих рабочих процессах. Можно указать, что каждый рабочий процесс должен выполнить функцию init для отдельной части массива, чтобы распараллелить инициализацию.

Вот небольшой пример.

julia> using Distributed

julia> addprocs(3)
3-element Array{Int64,1}:
 2
 3
 4

julia> @everywhere using SharedArrays

julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  3  4  4

julia> S[3,2] = 7
7

julia> S
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  7  4  4

Функция SharedArrays.localindices предоставляет отдельные одномерные диапазоны индексов и иногда бывает удобна для разделения задач между процессами. Конечно же, вы можете разделять работу как вам угодно.

julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedArray{Int64,2}:
 2  2  2  2
 3  3  3  3
 4  4  4  4

Так как базовые данные доступны всем процессам, важно следить за тем, чтобы не возникали конфликты. Пример:

@sync begin
    for p in procs(S)
        @async begin
            remotecall_wait(fill!, p, S, p)
        end
    end
end

приведет к неопределенному поведению. Так как каждый процесс заполняет весь массив собственным pid, в результате останется pid процесса, который выполнялся последним (для любого элемента S).

Чтобы рассмотреть более сложный пример, запустим параллельно следующее ядро.

q[i,j,t+1] = q[i,j,t] + u[i,j,t]

В этом случае, если мы попробуем разделить работу с помощью одномерного индекса, скорее всего, возникнут неприятности: если q[i,j,t] находится близко к концу блока, назначенного одному рабочему процессу, а q[i,j,t+1] — близко к началу блока, назначенного другому процессу, высока вероятность того, что часть q[i,j,t] не будет готова к моменту, когда она будет нужна для вычисления q[i,j,t+1]. В таких случаях лучше делить массив на части вручную. Давайте разделим массив по второму измерению. Определим функцию, которая возвращает индексы (irange, jrange), назначенные данному рабочему процессу.

julia> @everywhere function myrange(q::SharedArray)
           idx = indexpids(q)
           if idx == 0 # Этому рабочему процессу часть не назначена.
               return 1:0, 1:0
           end
           nchunks = length(procs(q))
           splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
           1:size(q,1), splits[idx]+1:splits[idx+1]
       end

Далее определим ядро.

julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
           @show (irange, jrange, trange)  # Выведем на экран, чтобы знать, что происходит.
           for t in trange, j in jrange, i in irange
               q[i,j,t+1] = q[i,j,t] + u[i,j,t]
           end
           q
       end

Также определим вспомогательную оболочку для реализации SharedArray.

julia> @everywhere advection_shared_chunk!(q, u) =
           advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)

Теперь давайте сравним три разных версии: выполняющуюся в одном процессе:

julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);

Использующую @distributed:

julia> function advection_parallel!(q, u)
           for t = 1:size(q,3)-1
               @sync @distributed for j = 1:size(q,2)
                   for i = 1:size(q,1)
                       q[i,j,t+1]= q[i,j,t] + u[i,j,t]
                   end
               end
           end
           q
       end;

Разделяющую работу по частям:

julia> function advection_shared!(q, u)
           @sync begin
               for p in procs(q)
                   @async remotecall_wait(advection_shared_chunk!, p, q, u)
               end
           end
           q
       end;

Если мы создадим массивы SharedArray и замерим время работы этих функций, то получим следующие результаты (при использовании команды julia -p 4).

julia> q = SharedArray{Float64,3}((500,500,500));

julia> u = SharedArray{Float64,3}((500,500,500));

Выполним функции один раз, чтобы произвести JIT-компиляцию, а затем применим макрос @time при втором выполнении.

julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
 830.220 milliseconds (216 allocations: 13820 bytes)

julia> @time advection_parallel!(q, u);
   2.495 seconds      (3999 k allocations: 289 MB, 2.09% gc time)

julia> @time advection_shared!(q,u);
        From worker 2:       (irange,jrange,trange) = (1:500,1:125,1:499)
        From worker 4:       (irange,jrange,trange) = (1:500,251:375,1:499)
        From worker 3:       (irange,jrange,trange) = (1:500,126:250,1:499)
        From worker 5:       (irange,jrange,trange) = (1:500,376:500,1:499)
 238.119 milliseconds (2264 allocations: 169 KB)

Главное преимущество advection_shared! в том, что трафик между рабочими процессами сводится к минимуму, так что каждый процесс может длительное время работать с назначенной частью.

Общие массивы и распределенная сборка мусора

Так же как удаленные ссылки, общие массивы полагаются на сборщик мусора в узле, где они создаются, для очистки ссылок во всех участвующих рабочих процессах. В коде, который создает множество кратковременно существующих общих массивов, желательно явным образом ликвидировать эти объекты как можно скорее. Это позволит быстрее очищать память и дескрипторы файлов, связанные с общим сегментом.

Объекты ClusterManager

Запуск процессов Julia, управление ими и их объединение в логический кластер осуществляются посредством диспетчеров кластеров. ClusterManager отвечает за следующие действия:

  • запуск рабочих процессов в среде кластера;

  • управление событиями в течение времени существования каждого рабочего процесса;

  • обеспечение передачи данных в случае необходимости.

Кластер в Julia обладает следующими особенностями:

  • Первоначальный процесс Julia, который также называется главным (master), является особым и имеет id 1.

  • Только процесс master может добавлять или удалять рабочие процессы.

  • Все процессы могут напрямую взаимодействовать друг с другом.

Соединения между рабочими процессами (с применением встроенного транспорта TCP/IP) устанавливаются следующим образом:

  • Функция addprocs вызывается в главном процессе с объектом ClusterManager.

  • Функция addprocs вызывает соответствующий метод launch, который порождает необходимое количество рабочих процессов на соответствующих компьютерах.

  • Каждый рабочий процесс начинает прослушивать свободный порт и записывает информацию о своем хосте и порте в поток stdout.

  • Диспетчер кластера захватывает поток stdout каждого рабочего процесса и делает его доступным главному процессу.

  • Главный процесс анализирует эту информацию и устанавливает подключения TCP/IP к каждому рабочему процессу.

  • Каждый рабочий процесс также уведомляется о других рабочих процессах в кластере.

  • Каждый рабочий процесс подключается к всем рабочим процессам с id меньше, чем id данного рабочего процесса.

  • Таким образом формируется многосвязная сеть, в которой каждый рабочий процесс напрямую подключен к всем остальным рабочим процессам.

Хотя по умолчанию транспортный уровень использует обычный сокет TCPSocket, кластер Julia может предоставлять собственный транспорт.

В Julia имеются два встроенных диспетчера кластеров:

LocalManager служит для запуска дополнительных рабочих процессов на том же хосте, позволяя использовать многоядерное и многопроцессорное оборудование.

Таким образом, диспетчер кластера должен отвечать как минимум следующим требованиям:

  • должен являться подтипом абстрактного типа ClusterManager;

  • должен реализовывать метод launch, отвечающий за запуск новых рабочих процессов;

  • должен реализовывать метод manage, который вызывается в ответ на различные события во время существования рабочего процесса (например, на отправку сигнала прерывания).

addprocs(manager::FooManager) требует реализации в FooManager следующих методов.

function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

Для примера давайте посмотрим, как реализован диспетчер LocalManager, отвечающий за запуск рабочих процессов на том же хосте.

struct LocalManager <: ClusterManager
    np::Integer
end

function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

Метод launch принимает следующие аргументы:

  • manager::ClusterManager — диспетчер кластера, с которым вызывается addprocs;

  • params::Dict — все именованные аргументы, передаваемые в addprocs;

  • launched::Array — массив, в который добавляется один объект WorkerConfig или несколько;

  • c::Condition — переменная условия для уведомления о запуске рабочих процессов.

Метод launch вызывается асинхронно в отдельной задаче. Завершение этой задачи сигнализирует о том, что все запрошенные рабочие процессы запущены. Поэтому выход из функции launch ДОЛЖЕН осуществляться, как только все запрошенные рабочие процессы будут запущены.

Запускаемые рабочие процессы соединяются друг с другом и с главным процессом по схеме «все ко всем». Если указать аргумент командной строки --worker[=<cookie>], запускаемые процессы будут самостоятельно инициализироваться как рабочие и устанавливать подключения через сокеты TCP/IP.

Все рабочие процессы в кластере используют тот же файл cookie, что и главный процесс. Если файл cookie не указан (при использовании параметра --worker), рабочий процесс пытается прочитать его из стандартного потока ввода. LocalManager и SSHManager передают файл cookie в новые рабочие процессы через свои стандартные потоки ввода.

По умолчанию рабочий процесс прослушивает свободный порт по адресу, возвращенному в результате вызова getipaddr(). С помощью необязательного аргумента --bind-to bind_addr[:port] можно указать конкретный адрес для прослушивания. Это может быть полезно, если хост подсоединен к нескольким физическим линиям данных.

Примером транспорта, отличного от TCP/IP, в реализации может служить MPI. В этом случае параметр --worker указывать НЕЛЬЗЯ. Вместо этого новые рабочие процессы должны вызывать init_worker(cookie) перед использованием любых параллельных конструкций.

Для каждого запущенного рабочего процесса метод launch должен добавлять в launched объект WorkerConfig (с инициализацией соответствующих полей).

mutable struct WorkerConfig
    # Поля, общие для всех диспетчеров кластеров
    io::Union{IO, Nothing}
    host::Union{AbstractString, Nothing}
    port::Union{Integer, Nothing}

    # Используется при запуске дополнительных рабочих процессов на хосте
    count::Union{Int, Symbol, Nothing}
    exename::Union{AbstractString, Cmd, Nothing}
    exeflags::Union{Cmd, Nothing}

    # Внешние диспетчеры кластеров могут использовать этот объект для хранения информации на уровне отдельного рабочего процесса
    # Для хранения нескольких полей можно использовать словарь.
    userdata::Any

    # Подключения к рабочим процессам через SSHManager или туннель SSH
    tunnel::Union{Bool, Nothing}
    bind_addr::Union{AbstractString, Nothing}
    sshflags::Union{Cmd, Nothing}
    max_parallel::Union{Integer, Nothing}

    # Используется локальными диспетчерами или диспетчерами SSH
    connect_at::Any

    [...]
end

Большинство полей в WorkerConfig используются встроенными диспетчерами. Пользовательские диспетчеры кластеров обычно указывают только io или host и port.

  • Если задано поле io, из него считывается информация о хосте и порте. Рабочий процесс Julia выводит свой адрес и порт привязки при запуске. Это позволяет рабочим процессам Julia прослушивать любой свободный порт вместо настройки портов для рабочих процессов вручную.

  • Если поле io не задано, для подключения применяются поля host и port.

  • Поля count, exename и exeflags необходимы для запуска дополнительных рабочих процессов из данного. Например, диспетчер кластера может запустить один рабочий процесс на узле и использовать его для запуска дополнительных рабочих процессов.

    • Если поле count имеет целочисленное значение n, запускается всего n рабочих процессов.

    • Если поле count имеет значение :auto, количество запускаемых рабочих процессов равно количеству потоков ЦП (логических ядер) на компьютере.

    • Поле exename содержит имя исполняемого файла julia, включая полный путь.

    • В поле exeflags указываются аргументы командной строки, с которыми должны запускаться рабочие процессы.

  • Поля tunnel, bind_addr, sshflags и max_parallel применяются, когда для подключения к рабочим процессам из главного требуется туннель SSH.

  • userdata предоставляется для пользовательских диспетчеров кластеров с целью хранения их собственной информации о рабочих процессах.

Метод manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol) вызывается в разные моменты во время существования рабочего процесса с соответствующими значениями op:

  • со значением :register или :deregister, когда рабочий процесс добавляется в пул рабочих процессов Julia или удаляется из него;

  • со значением :interrupt, когда вызывается interrupt(workers). Диспетчер ClusterManager должен подать соответствующему рабочему процессу сигнал прерывания;

  • со значением :finalize в целях очистки.

Диспетчеры кластеров с пользовательскими транспортами

Для замены подключений по умолчанию по схеме «все ко всем» через сокеты TCP/IP на пользовательский транспортный уровень требуется немного больше усилий. Количество задач взаимодействия у процесса Julia равно количеству рабочих процессов, к которым он подключен. Например, рассмотрим кластер Julia с 32 процессами в многосвязной сети типа «все ко всем».

  • У каждого процесса Julia, таким образом, 31 задача взаимодействия.

  • Каждая задача обрабатывает все входящие сообщения от одного удаленного рабочего процесса посредством цикла обработки сообщений.

  • Цикл обработки сообщений ожидает объект IO (например, TCPSocket в реализации по умолчанию), считывает всё сообщение, обрабатывает его и ожидает следующего.

  • Сообщения отправляются процессу напрямую из любой задачи Julia — не только из задач взаимодействия — опять-таки посредством соответствующего объекта IO.

Новая реализация, заменяющая транспорт по умолчанию, должна устанавливать подключения к удаленным рабочим процессам и предоставлять соответствующие объекты IO, которых могут ожидать циклы обработки сообщений. Реализации требуют следующие обратные вызовы, относящиеся к диспетчеру.

connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)

Реализация по умолчанию (использующая сокеты TCP/IP) имеет вид connect(manager::ClusterManager, pid::Integer, config::WorkerConfig).

Метод connect должен возвращать пару объектов IO: один из них предназначен для чтения данных, отправляемых из рабочего процесса pid, а другой — для записи данных, передаваемых в рабочий процесс pid. Пользовательские диспетчеры кластеров могут использовать объект BufferStream в памяти в качестве посредника для передачи данных между пользовательским транспортом, возможно без объектов IO, и встроенной инфраструктурой Julia для параллельной обработки.

BufferStream — это размещаемый в памяти объект IOBuffer, который работает как IO: это поток, который может обрабатываться асинхронно.

В папке clustermanager/0mq в репозитории Examples есть пример использования ZeroMQ для подключения Julia по топологии «звезда» с брокером 0MQ в центре. Примечание. все процессы Julia по-прежнему логически связаны друг с другом — любой из них может отправлять сообщения любому другому напрямую, не учитывая то, что в качестве транспортного уровня применяется 0MQ.

При использовании пользовательских транспортов учитывайте следующие особенности.

  • Рабочие процессы Julia НЕ должны запускаться с параметром --worker. При запуске с параметром --worker новые рабочие процессы по умолчанию используют реализацию транспорта на основе сокетов TCP/IP.

  • Для каждого входящего логического подключения к рабочему процессу необходимо вызывать Base.process_messages(rd::IO, wr::IO)(). В результате запускается новая задача, отвечающая за считывание и запись сообщений, которые направляются рабочему процессу и от него и представлены объектами IO.

  • В рамках инициализации рабочего процесса необходимо вызвать init_worker(cookie, manager::FooManager).

  • Поле connect_at::Any в WorkerConfig может задаваться диспетчером кластера при вызове launch. Значение этого поля передается во всех обратных вызовах connect. Как правило, оно содержит информацию о том, как подключиться к рабочему процессу. Например, транспорт на основе сокетов TCP/IP использует это поле для указания кортежа (host, port), то есть хоста и порта для подключения к рабочему процессу.

Для удаления рабочего процесса из кластера вызывается метод kill(manager, pid, config). С целью надлежащей очистки в главном процессе должны закрываться соответствующие объекты IO. В реализации по умолчанию просто выполняется вызов exit() для указанного удаленного рабочего процесса.

В папке clustermanager/simple в репозитории Examples есть пример, демонстрирующий простую реализацию с использованием сокетов домена UNIX для настройки кластера.

Требования к сети для LocalManager и SSHManager

Кластеры Julia предназначены для работы в уже защищенных средах на основе такой инфраструктуры, как локальные компьютеры, кластеры отделов или даже облако. Этот раздел посвящен требованиям к сетевой безопасности для встроенных диспетчеров LocalManager и SSHManager.

  • Главный процесс не прослушивает никакой порт. Он просто подключается к рабочим процессам.

  • Каждый рабочий процесс привязывается только к одному локальному интерфейсу и прослушивает временный порт, номер которого назначается операционной системой.

  • Диспетчер LocalManager, используемый функцией addprocs(N), по умолчанию привязывается только к интерфейсу обратной петли. Это означает, что рабочие процессы, запущенные позднее на удаленных хостах (возможно, злоумышленником), не могут подключиться к кластеру. Если после addprocs(4) вызвать addprocs(["remote_host"]), произойдет сбой. Иногда пользователям может потребоваться создать кластер, состоящий из их локальной системы и нескольких удаленных систем. Для этого можно явным образом запросить привязку LocalManager к внешнему сетевому интерфейсу с помощью именованного аргумента restrict: addprocs(4; restrict=false).

  • Диспетчер SSHManager, используемый функцией addprocs(list_of_remote_hosts), запускает рабочие процессы на удаленных хостах через SSH. По умолчанию протокол SSH применяется только для запуска рабочих процессов Julia. Для дальнейшего взаимодействия между рабочими процессами и между ними и главным процессом используются обычные сокеты TCP/IP без шифрования. На удаленных хостах должен быть включен вход без пароля. Дополнительные флаги SSH или учетные данные можно указать с помощью именованного аргумента sshflags.

  • Вызов addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>) полезен в том случае, если подключения SSH нужно использовать также для взаимодействия между главным и рабочими процессами. Типичным примером является ситуация, когда среда REPL Julia (то есть главный процесс) выполняется на локальном компьютере, а остальной кластер — в облаке, таком как Amazon EC2. В таком случае в удаленном кластере необходимо открыть только порт 22, а клиент SSH проходит проверку подлинности посредством инфраструктуры открытых ключей (PKI). Учетные данные можно предоставить посредством sshflags, например sshflags=-i <keyfile>``.

    В топологии «все ко всем» (по умолчанию) все рабочие процессы подключаются друг к другу через обычные сокеты TCP. Таким образом, политика безопасности на узлах кластера должна обеспечивать возможность свободного подключения между рабочими процессами с использованием диапазона временных портов (зависит от ОС).

    Защита и шифрование всего трафика между рабочими процессами (посредством SSH) или шифрование отдельных сообщений могут осуществляться пользовательским диспетчером ClusterManager.

  • Если функция addprocs вызывается с параметром multiplex=true, посредством мультиплексирования SSH создается туннель между главным и рабочими процессами. Если вы настроили мультиплексирование SSH самостоятельно и подключение уже установлено, мультиплексирование SSH используется вне зависимости от значения параметра multiplex. Если мультиплексирование включено, переадресация настраивается с использованием существующего подключения (параметр -O forward в SSH). Это может быть полезно, если ваши серверы требуют проверки подлинности на основе пароля: проверки подлинности в Julia можно избежать, выполнив вход на сервер перед вызовом addprocs. Управляющий сокет будет находиться в ~/.ssh/julia-%r@%h:%p в течение сеанса, если только не используется существующее подключение с мультиплексированием. Обратите внимание: если вы создаете несколько процессов на узле и включаете мультиплексирование, пропускная способность может быть ограничена, так как в этом случае процессы совместно используют одно подключение TCP с мультиплексированием.

Все рабочие процессы в кластере используют один и тот же файл cookie, который по умолчанию представляет собой сгенерированную случайным образом строку в главном процессе.

  • Метод cluster_cookie() возвращает файл cookie, а cluster_cookie(cookie)() задает новый файл cookie и возвращает его.

  • Все подключения проходят проверку подлинности на обеих сторонах, чтобы подключаться друг к другу могли только рабочие процессы, запущенные главным процессом.

  • Файл cookie можно передавать рабочим процессам при их запуске с помощью аргумента --worker=<cookie>. Если аргумент --worker используется без указания файла cookie, рабочий процесс пытается прочитать файл cookie из стандартного потока ввода (stdin). После получения файла cookie поток stdin сразу же закрывается.

  • ClusterManager может получать файл cookie из главного процесса, вызывая метод cluster_cookie(). Диспетчеры кластеров, не использующие транспорт TCP/IP по умолчанию (а значит, не указывающие параметр --worker), должны вызывать init_worker(cookie, manager) с файлом cookie главного процесса.

Обратите внимание, что в средах с более высокими требованиями к безопасности может использоваться пользовательская реализация ClusterManager. Например, файлы cookie могут предоставляться заранее и поэтому не указываться в качестве аргумента при запуске.

Указание топологии сети (экспериментальная функция)

Именованный аргумент topology, передаваемый в функцию addprocs, определяет то, как рабочие процессы должны подключаться друг к другу:

  • :all_to_all (значение по умолчанию) — все рабочие процессы подключаются друг к другу;

  • :master_worker — только главный процесс, то есть процесс с pid 1, подключается к рабочим процессам;

  • :custom — метод launch диспетчера кластера задает топологию подключения посредством полей ident и connect_idents в WorkerConfig. Рабочий процесс с идентификатором ident, предоставленным диспетчером кластера, подключается к всем рабочим процессам, указанным в connect_idents.

Именованный аргумент lazy=true|false действует только в том случае, если параметр topology имеет значение :all_to_all. Если задано значение true, сразу после запуска кластера главный процесс подключен к всем рабочим. Подключения между конкретными рабочими процессами устанавливаются при первом удаленном вызове между ними. Это помогает сократить объем ресурсов, выделяемых изначально для взаимодействия внутри кластера. Подключения устанавливаются в зависимости от потребностей параллельной программы во время выполнения. Значение по умолчанию для аргумента lazy — true.

В настоящее время отправка сообщения между неподключенными рабочими процессами приводит к ошибке. Данная функциональная возможность и связанный с ней интерфейс должны считаться экспериментальными по своей сути и могут измениться в будущих выпусках.

Важные внешние пакеты

Помимо реализации параллелизма в Julia, существует множество внешних пакетов, достойных упоминания. Например, MPI.jl — это оболочка Julia для протокола MPI, Dagger.jl обеспечивает функциональные возможности, аналогичные библиотеке Dask в Python, а DistributedArrays.jl поддерживает распределенные операции с массивами в нескольких рабочих процессах, о чем уже рассказывалось в разделе Общие массивы.

Необходимо также упомянуть экосистему программирования GPU Julia, которая включает в себя следующие компоненты.

  1. CUDA.jl включает в себя различные библиотеки CUDA и поддерживает компиляцию ядер Julia для GPU Nvidia.

  2. oneAPI.jl служит оболочкой для унифицированной модели программирования oneAPI и поддерживает выполнение ядер Julia на поддерживаемых ускорителях. В настоящее время поддерживается только Linux.

  3. AMDGPU.jl включает в себя библиотеки ROCm AMD и поддерживает компиляцию ядер Julia для GPU AMD. В настоящее время поддерживается только Linux.

  4. Высокоуровневые библиотеки, такие как KernelAbstractions.jl, Tullio.jl и ArrayFire.jl.

В следующем примере мы используем DistributedArrays.jl и CUDA.jl для распределения массива по нескольким процессам. Для этого мы сначала приведем его посредством distribute() и CuArray().

Не забудьте, что DistributedArrays.jl необходимо импортировать во все процессы с помощью @everywhere.

$ ./julia -p 4

julia> addprocs()

julia> @everywhere using DistributedArrays

julia> using CUDA

julia> B = ones(10_000) ./ 2;

julia> A = ones(10_000) .* π;

julia> C = 2 .* A ./ B;

julia> all(C .≈ 4*π)
true

julia> typeof(C)
Array{Float64,1}

julia> dB = distribute(B);

julia> dA = distribute(A);

julia> dC = 2 .* dA ./ dB;

julia> all(dC .≈ 4*π)
true

julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}

julia> cuB = CuArray(B);

julia> cuA = CuArray(A);

julia> cuC = 2 .* cuA ./ cuB;

julia> all(cuC .≈ 4*π);
true

julia> typeof(cuC)
CuArray{Float64,1}

В следующем примере мы используем DistributedArrays.jl и CUDA.jl для распределения массива по нескольким процессам и вызова универсальной функции для него.

function power_method(M, v)
    for i in 1:100
        v = M*v
        v /= norm(v)
    end

    return v, norm(M*v) / norm(v)  # или (M*v) ./ v
end

power_method повторно создает векторы и нормализует их. В объявлении функции мы не указали сигнатуру типов. Давайте посмотрим, сработает ли это для указанных ранее типов данных.

julia> M = [2. 1; 1 1];

julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877

julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)

julia> cuM = CuArray(M);

julia> cuv = CuArray(v);

julia> curesult = power_method(cuM, cuv);

julia> typeof(curesult)
CuArray{Float64,1}

julia> dM = distribute(M);

julia> dv = distribute(v);

julia> dC = power_method(dM, dv);

julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}

В завершение этого краткого обзора внешних пакетов рассмотрим MPI.jl, оболочку Julia для протокола MPI. Рассмотрение всех внутренних функций заняло бы слишком много времени, поэтому будет лучше просто оценить подход к реализации протокола.

Рассмотрим простейший скрипт, который вызывает каждый подпроцесс, создает экземпляр его ранга и по достижении главного процесса суммирует ранги

import MPI

MPI.Init()

comm = MPI.COMM_WORLD
MPI.Barrier(comm)

root = 0
r = MPI.Comm_rank(comm)

sr = MPI.Reduce(r, MPI.SUM, root, comm)

if(MPI.Comm_rank(comm) == root)
   @printf("sum of ranks: %s\n", sr)
end

MPI.Finalize()
mpirun -np 4 ./julia example.jl

1. В данном контексте под MPI имеется в виду стандарт MPI-1. Начиная с версии MPI-2, комитет по разработке стандартов MPI ввел новый набор механизмов взаимодействия, которые в совокупности называются «Удаленный доступ к памяти» (RMA). Мотивом для добавления rma в стандарт MPI было стремление упростить шаблоны одностороннего взаимодействия. Дополнительные сведения о новейшем стандарте MPI см. по адресу https://mpi-forum.org/docs.