Распределенные вычисления

Инструменты для реализации распределенных вычислений.

# Distributed.addprocsFunction

addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers

Запускает рабочие процессы через указанный диспетчер кластеров.

Например, кластеры Beowulf поддерживаются через специализированный диспетчер, реализованный в пакете ClusterManagers.jl.

Время в секундах, в течение которого запущенный рабочий процесс должен ожидать подключения от главного процесса, можно указать в переменной JULIA_WORKER_TIMEOUT в среде рабочего процесса. Это применимо, только когда в качестве транспортного протокола используется TCP/IP.

Чтобы рабочие процессы не блокировали REPL или содержащую их функцию при запуске процессов программно, следует выполнять addprocs в отдельной задаче.

Примеры

# Асинхронный запуск addprocs для занятых кластеров
t = @async addprocs(...)
# Использование рабочих процессов по мере их включения
if nprocs() > 1   # Проверка, что доступен хотя бы один новый процесс
   ....   # Распределенное выполнение
end
# Получение ИД запущенных рабочих процессов или сообщений об ошибках
if istaskdone(t)   # Проверка завершения addprocs, чтобы из-за fetch не возникала блокировка
    if nworkers() == N
        new_pids = fetch(t)
    else
        fetch(t)
    end
end
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> List of process identifiers

Добавляет рабочие процессы на удаленных компьютерах через SSH. Настройка производится с помощью именованных аргументов (см. ниже). В частности, с помощью аргумента exename можно указать путь к двоичному файлу julia на удаленных компьютерах.

machines — это вектор, содержащий характеристики компьютеров в виде строк формата [user@]host[:port] [bind_addr[:port]]. По умолчанию user — это текущий пользователь, а port — стандартный SSH-порт. Если указан элемент [bind_addr[:port]], другие рабочие процессы будут подключаться к текущему по указанному адресу bind_addr через порт port.

На удаленном узле можно запустить несколько процессов, передав кортеж в векторе machines в формате (machine_spec, count), где count — это число рабочих процессов, которые нужно запустить на указанном узле. Если в качестве числа рабочих процессов передать :auto, число рабочих процессов будет равно числу потоков ЦП на удаленном узле.

Примеры:

addprocs([
    "remote1",               # один рабочий процесс на узле remote1, выполняющий вход с текущим именем пользователя
    "user@remote2",          # один рабочий процесс на узле remote2, выполняющий вход с именем пользователя user
    "user@remote3:2222",     # для узла remote3 указывается SSH-порт 2222
    ("user@remote4", 4),     # на узле remote4 запускаются 4 рабочих процесса
    ("user@remote5", :auto), # на узле remote5 запускается столько рабочих процессов, сколько имеется потоков ЦП
])

Именованные аргументы:

  • tunnel: при значении true будет использоваться SSH-туннелирование для подключения к рабочему процессу из главного процесса. Значение по умолчанию — false.

  • multiplex: при значении true для SSH-туннелирования используется мультиплексирование SSH. Значение по умолчанию — false.

  • ssh: имя исполняемого файла SSH-клиента, используемого для запуска рабочих процессов, или путь к этому файлу. Значение по умолчанию — "ssh".

  • sshflags: задает дополнительные параметры SSH, например $sshflags=-i /home/foo/bar.pem$.

  • max_parallel: задает максимальное число рабочих процессов, к которым параллельно осуществляется подключение на узле. По умолчанию — 10.

  • shell: указывает тип оболочки, к которой подключается SSH на рабочих узлах.

    • shell=:posix: оболочка Unix/Linux, совместимая с POSIX (sh, ksh, bash, dash, zsh и т. д.). Используется по умолчанию.

    • shell=:csh: оболочка C shell для Unix (csh, tcsh).

    • shell=:wincmd: Microsoft Windows cmd.exe.

  • dir: задает рабочий каталог рабочих процессов. По умолчанию — текущий каталог узла (определяемый pwd()).

  • enable_threaded_blas: если задано значение true, то BLAS будет работать в добавляемых процессах в многопоточном режиме. Значение по умолчанию — false.

  • exename: имя исполняемого файла julia. По умолчанию имеет значение "$(Sys.BINDIR)/julia" или "$(Sys.BINDIR)/julia-debug" в зависимости от ситуации. На всех удаленных компьютерах рекомендуется использовать одну и ту же версию Julia, иначе при сериализации и распространении кода могут происходить сбои.

  • exeflags: дополнительные флаги, передаваемые рабочим процессам.

  • topology: задает, как рабочие процессы подключаются друг к другу. Отправка сообщения между неподключенными рабочими процессами вызывает ошибку.

    • topology=:all_to_all: All processes are connected to each other. The default.

    • topology=:master_worker: Only the driver process, i.e. pid 1 connects to the workers. The workers do not connect to each other.

    • topology=:custom: The launch method of the cluster manager specifies the connection topology via fields ident and connect_idents in WorkerConfig. A worker with a cluster manager identity ident will connect to all workers specified in connect_idents.

  • lazy: применимо только для topology=:all_to_all. При значении true соединения между рабочими процессами настраиваются в отложенном режиме, то есть при первом удаленном вызове между рабочими процессами. Значение по умолчанию — true.

  • env: позволяет задать массив из пар строковых значений, таких как env=["JULIA_DEPOT_PATH"=>"/depot"], чтобы запросить о задании переменных среды на удаленном компьютере. По умолчанию только переменная среды JULIA_WORKER_TIMEOUT автоматически передается из локальной в удаленную среду.

  • cmdline_cookie: передает cookie-файл проверки подлинности с использованием параметра --worker командной строки. Применяемая по умолчанию (и более безопасная) передача cookie-файла через поток stdio в SSH может зависать при использовании рабочих процессов Windows с устаревшими версиями Julia или Windows (до появления ConPTY). В этом случае cmdline_cookie=true может служить как обходное решение.

Совместимость: Julia 1.6

Именованные аргументы ssh, shell, env и cmdline_cookie были добавлены в Julia 1.6.

Переменные среды:

Если главному процессу не удается установить соединение с только что запущенным рабочим процессом в течение 60,0 секунд, рабочий процесс считает это неустранимой ошибкой и завершает работу. Это время ожидания можно контролировать через переменную среды JULIA_WORKER_TIMEOUT. Значение JULIA_WORKER_TIMEOUT для главного процесса задает время в секундах, в течение которого только что запущенный рабочий процесс ожидает подключения.

addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers

Запускает np рабочих процессов на локальном узле, используя встроенный диспетчер LocalManager.

Локальные рабочие процессы наследуют текущую пакетную среду (то есть активный проект, LOAD_PATH и DEPOT_PATH) от главного процесса.

Именованные аргументы:

  • restrict::Bool: если имеет значение true (по умолчанию), привязка ограничена адресом 127.0.0.1.

  • dir, exename, exeflags, env, topology, lazy, enable_threaded_blas: действуют аналогичным образом для SSHManager; см. документацию по addprocs(machines::AbstractVector).

Совместимость: Julia 1.9

Наследование пакетной среды и именованный аргумент env были добавлены в версии Julia 1.9.

# Distributed.nprocsFunction

nprocs()

Получает число доступных процессов.

Примеры

julia> nprocs()
3

julia> workers()
2-element Array{Int64,1}:
 2
 3

# Distributed.nworkersFunction

nworkers()

Получает число доступных рабочих процессов. Оно на единицу меньше, чем nprocs(), но будет равно nprocs(), если nprocs() == 1.

Примеры

$ julia -p 2

julia> nprocs()
3

julia> nworkers()
2

# Distributed.procsMethod

procs()

Возвращает список всех идентификаторов процессов, включая идентификатор 1 (который не включается в workers()).

Примеры

$ julia -p 2

julia> procs()
3-element Array{Int64,1}:
 1
 2
 3

# Distributed.procsMethod

procs(pid::Integer)

Возвращает список всех идентификаторов процессов на одном физическом узле, а именно всех рабочих процессов, привязанных к тому же IP-адресу, что у pid.

# Distributed.workersFunction

workers()

Возвращает список всех идентификаторов рабочих процессов.

Примеры

$ julia -p 2

julia> workers()
2-element Array{Int64,1}:
 2
 3

# Distributed.rmprocsFunction

rmprocs(pids...; waitfor=typemax(Int))

Удаляет указанные рабочие процессы. Учтите, что только процесс 1 может добавлять или удалять рабочие процессы.

Аргумент waitfor задает время ожидания отключения рабочих процессов:  — Если он не указан, rmprocs будет ждать до тех пор, пока все запрошенные процессы pids не будут удалены.

  • Будет выдано исключение ErrorException, если завершить работу всех процессов за указанное время waitfor невозможно.

  • Если waitfor имеет значение 0, вызов немедленно возвращает рабочие процессы. запланированные к удалению в другой задаче. Возвращается объект Task, в котором запланировано удаление. Пользователю следует вызвать wait для этой задачи, прежде чем начинать какие-либо другие параллельные вызовы.

Примеры

$ julia -p 5

julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0

julia> wait(t)

julia> workers()
3-element Array{Int64,1}:
 4
 5
 6

# Distributed.interruptFunction

interrupt(pids::Integer...)

Прерывает выполняемую в данный момент задачу в указанных рабочих процессах. Это равноценно нажатию на локальном компьютере клавиш Ctrl-C. При отсутствии аргументов прерываются все рабочие процессы.

interrupt(pids::AbstractVector=workers())

Прерывает выполняемую в данный момент задачу в указанных рабочих процессах. Это равноценно нажатию на локальном компьютере клавиш Ctrl-C. При отсутствии аргументов прерываются все рабочие процессы.

# Distributed.myidFunction

myid()

Получает идентификатор текущего процесса.

Примеры

julia> myid()
1

julia> remotecall_fetch(() -> myid(), 4)
4

# Distributed.pmapFunction

pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection

Преобразует коллекцию c, применяя f к каждому элементу с использованием доступных рабочих процессов и задач.

Если указано несколько аргументов коллекции, f применяется поэлементно.

Учтите, что необходимо обеспечить доступность f для всех рабочих процессов. См. Доступность кода и загрузка пакетов для получения дополнительных сведений.

Если не указать пул рабочих процессов, будут использованы все доступные процессы, т. е. пул по умолчанию.

По умолчанию pmap распределяет вычисления по всем указанным рабочим процессам. Чтобы использовать только локальные процессы и распределить нагрузку по задачам, укажите distributed=false. Это равноценно использованию asyncmap. Например, pmap(f, c; distributed=false) эквивалентно asyncmap(f,c; ntasks=()->nworkers())

Функция pmap также может использовать сочетание процессов и задач с помощью аргумента размера пакетов (batch_size). При размере пакетов больше 1 коллекция обрабатывается в нескольких пакетах, каждый из которых имеет длину batch_size или меньше. Пакет отправляется в виде одного запроса свободному рабочему процессу, и затем локальная функция asyncmap обрабатывает элементы пакета с помощью множества параллельных задач.

При возникновении какой-либо ошибки функция pmap останавливает обработку оставшейся части коллекции. Чтобы этого не происходило, вы можете использовать аргумент on_error и задать функцию обработки ошибок, которая принимает единственный аргумент, а именно исключение. Эта функция может повторно выдавать ошибку и прерывать обработку либо продолжать работу, возвращая любое значение в inline-режиме вместе с результатами вызывающему объекту.

Рассмотрим следующие два примера кода. В первом примере объект исключения возвращается в inline-режиме, а во втором вместо любого исключения возвращается 0:

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0

Для обработки ошибок можно также повторять невыполненные вычисления. Именованные аргументы retry_delays и retry_check передаются в retry в качестве именованных аргументов delays и check соответственно. Если задана пакетная обработка и происходит сбой всего пакета, все элементы из пакета будут обрабатываться повторно.

Учтите, что если указать и on_error, и retry_delays, то сначала вызывается перехватчик on_error, а затем выполняется повтор. Если on_error не выдает (или не выдает повторно) исключение, обработка элемента не повторяется.

Пример: повторное выполнение f для элемента максимум три раза без какой-либо паузы при наличии ошибок.

pmap(f, c; retry_delays = zeros(3))

Пример: повторное выполнение f только при исключениях, не относящихся к типу InexactError, с экспоненциальным увеличением задержки максимум три раза, а для всех случаев InexactError — возврат на месте значения NaN.

pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))

# Distributed.RemoteExceptionType

RemoteException(captured)

Фиксирует исключения в удаленных вычислениях и повторно выдает их локально. RemoteException является оболочкой для идентификатора рабочего процесса (pid) и записанного исключения. В CapturedException записывается удаленное исключение и стек вызовов в сериализованной форме на тот момент, когда исключение возникло.

# Distributed.ProcessExitedExceptionType

ProcessExitedException(worker_id::Int)

После завершения клиентского процесса Julia дальнейшие попытки обращения к недостижимому дочернему объекту будут приводить к выдаче исключения.

# Distributed.FutureType

Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)

A Future is a placeholder for a single computation of unknown termination status and time. For multiple potential computations, see RemoteChannel. See remoteref_id for identifying an AbstractRemoteRef.

# Distributed.RemoteChannelType

RemoteChannel(pid::Integer=myid())

Ссылка на Channel{Any}(1) для процесса pid. По умолчанию pid — это текущий процесс.

RemoteChannel(f::Function, pid::Integer=myid())

Создает ссылки на удаленные каналы определенного размера и типа. Функция f при выполнении для процесса pid должна возвращать реализацию AbstractChannel.

Например, RemoteChannel(()->Channel{Int}(10), pid) возвращает ссылку на канал типа Int с размером 10 для процесса pid.

По умолчанию pid — это текущий процесс.

# Base.fetchMethod

fetch(x::Future)

Wait for and get the value of a Future. The fetched value is cached locally. Further calls to fetch on the same reference return the cached value. If the remote value is an exception, throws a RemoteException which captures the remote exception and backtrace.

# Base.fetchMethod

fetch(c::RemoteChannel)

Ожидает и получает значение из RemoteChannel. Выдает такие же исключения, как и для Future. Полученный элемент не удаляется.

fetch(x::Any)

Возвращает x.

# Distributed.remotecallMethod

remotecall(f, id::Integer, args...; kwargs...) -> Future

Вызывает функцию f в асинхронном режиме для заданных аргументов в указанном процессе. Возвращает значение типа Future. Именованные аргументы (при наличии) передаются функции f.

# Distributed.remotecall_waitMethod

remotecall_wait(f, id::Integer, args...; kwargs...)

Выполняет более быстрое ожидание wait(remotecall(...)) в одном сообщении рабочего процесса (Worker), задаваемого идентификатором id. Именованные аргументы (при наличии) передаются функции f.

См. также описание wait и remotecall.

# Distributed.remotecall_fetchMethod

remotecall_fetch(f, id::Integer, args...; kwargs...)

Выполняет fetch(remotecall(...)) в одном сообщении. Именованные аргументы (при наличии) передаются функции f. Любые удаленные исключения фиксируются в RemoteException и затем выдаются.

См. также описание fetch и remotecall.

Примеры

$ julia -p 2

julia> remotecall_fetch(sqrt, 2, 4)
2.0

julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...

# Distributed.remote_doMethod

remote_do(f, id::Integer, args...; kwargs...) -> nothing

Выполняет функцию f в рабочем процессе id в асинхронном режиме. В отличие от remotecall, этот метод не сохраняет результат вычисления, а также не позволяет ожидать завершения.

Успешный вызов показывает, что запрос был принят к выполнению на удаленном узле.

Последовательные вызовы remotecall для одного рабочего процесса сериализируются в порядке их осуществления, однако порядок выполнения на удаленном узле не определен. Например, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) сериализирует вызовы к f1, а затем к f2 и f3 в таком порядке. Однако выполнение f1 перед f3 в рабочем процессе 2 не гарантируется.

Выдаваемые f исключения выводятся в поток stderr удаленного процесса.

Именованные аргументы (при наличии) передаются функции f.

# Base.put!Method

put!(rr::RemoteChannel, args...)

Сохраняет набор значений в канале RemoteChannel. Если канал заполнен, метод блокирует работу, пока не появится доступное пространство. Возвращается первый аргумент.

# Base.put!Method

put!(rr::Future, v)

Store a value to a Future rr. Futures are write-once remote references. A put! on an already set Future throws an Exception. All asynchronous remote calls return Futures and set the value to the return value of the call upon completion.

# Base.take!Method

take!(rr::RemoteChannel, args...)

Получает значения из удаленного канала (RemoteChannel) rr, удаляя эти значения в процессе.

# Base.isreadyMethod

isready(rr::RemoteChannel, args...)

Определяет, имеет ли RemoteChannel значение, хранимое в нем. Учтите, что эта функция может вызывать состояние гонки, так как к моменту получения результата он может уже не соответствовать истине. Однако ее можно безопасно использовать с объектами Future, так как значения им присваиваются только один раз.

# Base.isreadyMethod

isready(rr::Future)

Determine whether a Future has a value stored to it.

If the argument Future is owned by a different node, this call will block to wait for the answer. It is recommended to wait for rr in a separate task instead or to use a local Channel as a proxy:

p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f)  # will not block

# Distributed.AbstractWorkerPoolType

AbstractWorkerPool

Супертип для пулов рабочих процессов, таких как WorkerPool и CachingPool. Для пула AbstractWorkerPool необходимо реализовать следующее:  — push! — добавляет новый рабочий процесс в общий пул (доступные + занятые).  — put! — возвращает рабочий процесс в пул доступных.  — take! — забирает рабочий процесс из пула доступных (чтобы использовать его для удаленного выполнения функций).  — length — число доступных рабочих процессов в общем пуле.  — isready — возвращает значение false, если использование take! в пуле приведет к блокировке; в противном случае возвращает true.

Используемые по умолчанию реализации вышеуказанных методов (для AbstractWorkerPool) требуют указания следующих полей:

  • channel::Channel{Int}

  • workers::Set{Int}

Поле channel содержит идентификаторы свободных рабочих процессов, а workers — набор всех процессов, связанных с данным пулом.

# Distributed.WorkerPoolType

WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})

Создает пул WorkerPool на основе вектора или диапазона идентификаторов рабочих процессов.

Примеры

$ julia -p 3

julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))

julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))

# Distributed.CachingPoolType

CachingPool(workers::Vector{Int})

Представляет собой реализацию AbstractWorkerPool. remote, remotecall_fetch, pmap (и другие удаленные вызовы для удаленного выполнения функций) работают эффективнее при кэшировании сериализируемых и десериализируемых функций на рабочих узлах, особенно при использовании замыканий (способных записывать большой объем данных).

Удаленный кэш поддерживается в течение всего времени существования возвращаемого объекта CachingPool. Для более ранней очистки кэша используйте clear!(pool).

При использовании глобальных переменных в замыкании записываются только привязки, а не сами данные. Для записи глобальных данных можно использовать блоки let.

Примеры

const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
    pmap(i -> sum(foo) + i, wp, 1:100);
end

В примере выше foo передается каждому рабочему процессу только один раз.

# Distributed.default_worker_poolFunction

default_worker_pool()

Пул AbstractWorkerPool, содержащий неактивные рабочие процессы (workers) — используется функциями remote(f) и pmap (по умолчанию). Если пул рабочих процессов по умолчанию не задан явно с помощью default_worker_pool!(pool), в качестве пула по умолчанию будет инициализирован WorkerPool.

Примеры

$ julia -p 3

julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))

# Distributed.clear!Method

clear!(pool::CachingPool) -> pool

Удаляет все кэшированные функции для всех участвующих рабочих процессов.

# Distributed.remoteFunction

remote([p::AbstractWorkerPool], f) -> Function

Возвращает анонимную функцию, которая выполняет функцию f в доступном рабочем процессе (взятом из пула p типа WorkerPool, если такой пул предоставлен) с использованием remotecall_fetch.

# Distributed.remotecallMethod

remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

Вариант remotecall(f, pid, ....) с использованием пула WorkerPool. Ожидает и забирает из pool свободный рабочий процесс, а затем выполняет в нем remotecall.

Примеры

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)

В этом примере задача выполнялась в процессе pid 2, вызванном из pid 1.

# Distributed.remotecall_waitMethod

remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

Вариант remotecall_wait(f, pid, ....) с использованием пула WorkerPool. Ожидает и забирает из pool свободный рабочий процесс, а затем выполняет в нем remotecall_wait.

Примеры

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)

julia> fetch(f)
0.9995177101692958

# Distributed.remotecall_fetchMethod

remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result

Вариант remotecall_fetch(f, pid, ....) с использованием пула WorkerPool. Ожидает и забирает из pool свободный рабочий процесс, а затем выполняет в нем remotecall_fetch.

Примеры

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958

# Distributed.remote_doMethod

remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing

Вариант remote_do(f, pid, ....) с использованием пула WorkerPool. Ожидает и забирает из pool свободный рабочий процесс, а затем выполняет в нем remote_do.

# Distributed.@spawnatMacro

@spawnat p expr

Создает вокруг выражения замыкание и выполняет это замыкание асинхронно в процессе p. Возвращает Future в качестве результата. Если указать для p в кавычках символьный литерал :any, система выберет используемый обработчик автоматически.

Примеры

julia> addprocs(3);

julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)

julia> fetch(f)
2

julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Совместимость: Julia 1.3

Аргумент :any впервые реализован в Julia 1.3.

# Distributed.@fetchMacro

@fetch expr

Эквивалентно fetch(@spawnat :any expr). См. fetch и @spawnat.

Примеры

julia> addprocs(3);

julia> @fetch myid()
2

julia> @fetch myid()
3

julia> @fetch myid()
4

julia> @fetch myid()
2

# Distributed.@fetchfromMacro

@fetchfrom

Эквивалентно fetch(@spawnat p expr). См. fetch и @spawnat.

Примеры

julia> addprocs(3);

julia> @fetchfrom 2 myid()
2

julia> @fetchfrom 4 myid()
4

# Distributed.@distributedMacro

@distributed

Распределяет в параллельном режиме память для цикла в следующей форме:

@distributed [reducer] for var = range
    body
end

Указанный диапазон разбивается на секции и используется для локального выполнения во всех рабочих процессах. Если задана функция-редуктор (reducer), то @distributed выполняет локальные преобразования для каждого рабочего процесса и итоговое преобразование для вызывающего процесса.

Учтите, что без функции-редуктора макрос @distributed выполняется асинхронно, т. е. порождает независимые задачи во всех рабочих процессах и немедленно возвращает результат, не ожидая завершения. Чтобы дождаться завершения, поставьте перед вызовом префикс @sync, например:

@sync @distributed for var = range
    body
end

# Distributed.@everywhereMacro

@everywhere [procs()] expr

Выполняет выражение в Main во всех процессах procs. Возникающие в процессах ошибки собираются в исключение CompositeException и затем выдаются. Пример:

@everywhere bar = 1

Это выражение определяет Main.bar для всех текущих процессов. Для добавляемых позже процессов (например, с помощью addprocs()) это выражение определено не будет.

В отличие от @spawnat, макрос @everywhere не записывает никакие локальные переменные. Вместо этого их можно транслировать с использованием интерполяции:

foo = 1
@everywhere bar = $foo

Необязательный аргумент procs позволяет задать подмножество всех процессов, которым необходимо выполнить выражение.

Это эквивалентно вызову remotecall_eval(Main, procs, expr) с двумя дополнительными особенностями:

- Сначала выполняются операторы `using` и `import` для вызывающего процесса, чтобы обеспечить
  предварительную компиляцию пакетов.
- Текущий путь к файлу исходного кода, используемый `include`, распространяется на другие процессы.

# Distributed.clear!Method

clear!(syms, pids=workers(); mod=Main)

Очищает в модулях глобальные привязки, инициализируя их как nothing. Параметр syms должен быть значением типа Symbol или коллекцией значений Symbol. pids и mod указывают процессы и модуль, в которых глобальные переменные будут инициализированы повторно. Будут очищены только имена, определения которых обнаружены в mod.

При запросе об очистке глобальной константы будет выдано исключение.

# Distributed.remoteref_idFunction

remoteref_id(r::AbstractRemoteRef) -> RRID

Объекты Future и RemoteChannel можно идентифицировать по следующим полям:

  • where — узел, где фактически существует используемый объект или хранилище, указываемые по ссылке.

  • whence — узел, с которого была создана удаленная ссылка. Обратите внимание, что это не тот узел, на котором фактически существует указываемый по ссылке объект. Например, при вызове RemoteChannel(2) из главного процесса мы получим для where значение 2, а для whence — 1.

  • id — уникальный идентификатор среди всех ссылок, созданных с узла рабочего процесса, указанного в whence.

В совокупности whence и id уникальным образом идентифицируют ссылку среди всех рабочих процессов.

Функция remoteref_id представляет собой низкоуровневый API-интерфейс, который возвращает объект RRID, являющийся оболочкой для значений whence и id удаленной ссылки.

# Distributed.channel_from_idFunction

channel_from_id(id) -> c

Низкоуровневый API-интерфейс, который возвращает вспомогательный канал AbstractChannel для идентификатора id, возвращаемого функцией remoteref_id. Этот вызов действителен только на узле, где вспомогательный канал существует.

# Distributed.worker_id_from_socketFunction

worker_id_from_socket(s) -> pid

Низкоуровневый API-интерфейс, который принимает IO-подключение или рабочий процесс Worker и возвращает идентификатор pid подключенного к ним рабочего процесса. Это бывает полезно при написании для типа особых методов serialize, которые оптимизируют выводимые данные в зависимости от идентификатора принимающего процесса.

cluster_cookie() -> cookie

Возвращает cookie-файл кластера.

cluster_cookie(cookie) -> cookie

Задает принимаемый cookie-файл в качестве cookie кластера и возвращает этот файл.

Интерфейс диспетчера кластера

Этот интерфейс предоставляет механизм для запуска рабочих процессов Julia и управления ими в различных кластерных средах. В модуле Base есть диспетчеры двух типов: LocalManager для запуска дополнительных рабочих процессов на локальном узле и SSHManager для запуска на удаленных узлах посредством ssh. Для подключения и передачи сообщений между процессами применяются сокеты TCP/IP. Диспетчеры кластеров могут предоставлять другой механизм транспорта.

# Distributed.ClusterManagerType

ClusterManager

Супертип для диспетчеров кластеров, которые управляют рабочими процессами как кластером. Диспетчеры кластеров реализуют способы добавления, удаления и взаимодействия для рабочих процессов. Подтипами данного типа являются SSHManager и LocalManager.

# Distributed.WorkerConfigType

WorkerConfig

Тип, используемый диспетчерами кластеров (ClusterManager) для управления добавляемыми в кластеры рабочими процессами. Некоторые поля используются всеми диспетчерами кластеров для доступа к узлу:

  • io — подключение, используемое для доступа к рабочему процессу (подтип IO или Nothing).

  • host — адрес узла (String либо Nothing).

  • port — используемый на узле порт для подключения к рабочему процессу (Int либо Nothing).

Некоторые поля используются диспетчером кластеров для добавления рабочих процессов к уже инициализированному узлу:

  • count — число рабочих процессов, которые нужно запустить на узле.

  • exename — путь к исполняемому файлу Julia на узле (по умолчанию "$(Sys.BINDIR)/julia") или "$(Sys.BINDIR)/julia-debug"

  • exeflags — флаги, используемые при удаленном запуске Julia.

Поле userdata используется внешними диспетчерами для хранения данных о каждом рабочем процессе.

Некоторые поля используются SSHManager и подобными диспетчерами:

  • tunnel — может иметь значение true (использовать туннелирование), false (не использовать туннелирование) или nothing (использовать режим по умолчанию диспетчера).

  • multiplex — может иметь значение true (использовать для туннелирования мультиплексирование SSH) или false.

  • forward — параметр переадресации, используемый для параметра -L в SSH.

  • bind_addr — адрес для привязки на удаленном узле.

  • sshflags — флаги, используемые при установке SSH-соединения.

  • max_parallel — максимальное число рабочих процессов, к которым параллельно осуществляется подключение на узле.

Некоторые поля используются как локальными диспетчерами (LocalManager), так и диспетчерами SSHManager:

  • connect_at — определяет, используется ли данный вызов для настройки подключения между рабочими процессами или между рабочим процессом и главным.

  • process — процесс, который будет подключен (как правило, диспетчер назначает его при выполнении addprocs).

  • ospid — идентификатор процесса согласно ОС узла, используемый для прерывания рабочих процессов.

  • environ — закрытый словарь, используемый локальными диспетчерами или диспетчерами SSH для хранения временных данных.

  • ident — рабочий процесс, определяемый диспетчером кластеров (ClusterManager).

  • connect_idents — список идентификаторов процессов, к которым должен подключаться рабочий процесс при использовании настраиваемой топологии.

  • enable_threaded_blas — задает для рабочих процессов использование BLAS в многопоточном режиме (возможные значения: true, false или nothing).

# Distributed.launchFunction

launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)

Реализуется в диспетчерах кластеров. Для каждого запускаемого этой функцией рабочего процесса Julia она должна добавлять в массив launched запись WorkerConfig и отправлять уведомление launch_ntfy. Функция ОБЯЗАТЕЛЬНО должна завершать работу, когда все запрошенные диспетчером manager функции будут запущены. params — это словарь всех именованных аргументов, с которыми вызывалась функция addprocs.

# Distributed.manageFunction

manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

Реализуется в диспетчерах кластеров. Эта функция вызывается в главном процессе во время существования рабочего процесса с необходимыми значениями op:

  • Со значением :register или :deregister при добавлении или удалении процесса в пуле рабочих процессов Julia.

  • Со значением :interrupt при вызове interrupt(workers). Диспетчер ClusterManager должен отправить соответствующему рабочему процессу сигнал о прерывании.

  • Со значением :finalize для целей очистки.

# Base.killMethod

kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

Реализуется в диспетчерах кластеров. Этот метод вызывается rmprocs в главном процессе. Он должен завершать работу удаленного процесса, задаваемого pid. Метод kill(manager::ClusterManager.....) удаленно выполняет exit() для pid.

# Sockets.connectMethod

connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

Реализуется в диспетчерах кластеров, использующих особые транспортные протоколы. Метод должен устанавливать логическое подключение к рабочему процессу с идентификатором pid, задаваемое в config, и возвращать пару объектов IO (ввода-вывода). Сообщения от pid к текущему процессу будут считываться из потока instrm, а сообщения, отправляемые процессу pid, — записываться в outstrm. Реализация особого транспортного протокола должна обеспечивать доставку и получение сообщений полностью и в правильном порядке. Метод connect(manager::ClusterManager.....) настраивает подключение сокетов TCP/IP между рабочими процессами.

# Distributed.init_workerFunction

init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())

Вызывается диспетчерами кластеров, реализующими особые транспортные протоколы. Эта функция инициализирует только что запущенный процесс в качестве рабочего процесса. Аргумент командной строки --worker[=<cookie>] обеспечивает инициализацию процесса как рабочего с использованием в качестве транспортного протокола сокетов TCP/IP. В качестве cookie используется файл cluster_cookie.

# Distributed.start_workerFunction

start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)

start_worker — это внутренняя функция, являющаяся точкой входа по умолчанию для рабочих процессов, подключающихся через TCP/IP. Она настраивает процесс в качестве рабочего процесса кластера Julia.

Данные host:port записываются в поток out (по умолчанию stdout).

При необходимости функция считывает cookie-файл из stdin, ожидает передачи данных на незанятом порту (либо на порту, который задается параметром командной строки --bind-to) и планирует выполнение задач по обработке входящих TCP-подключений и запросов. Эта функция также может (необязательно) закрывать stdin и перенаправлять поток stderr в stdout.

Она не возвращает никаких значений.

# Distributed.process_messagesFunction

process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)

Вызывается диспетчерами кластеров, использующими особые транспортные протоколы. Эта функция должна вызываться, когда особая реализация транспортного протокола получает первое сообщение от удаленного рабочего процесса. Настроенный протокол должен обеспечить логическое подключение к удаленному процессу и предоставить два объекта IO: один для входящих сообщений, а другой для сообщений, адресуемых удаленному процессу. Если incoming имеет значение true, соединение инициировано удаленным одноранговым узлом. Тот узел в паре, который инициировал соединение, передает cookie-файл кластера и свой номер версии Julia для согласования подключения и проверки подлинности.

См. также описание cluster_cookie.

# Distributed.default_addprocs_paramsFunction

default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}

Реализуется в диспетчерах кластеров. Именованные параметры по умолчанию, передаваемые при вызове addprocs(mgr). Минимальный набор параметров доступен при вызове default_addprocs_params()