Распределенные вычисления
Инструменты для реализации распределенных вычислений.
#
Distributed.addprocs
— Function
addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers
Запускает рабочие процессы через указанный диспетчер кластеров.
Например, кластеры Beowulf поддерживаются через специализированный диспетчер, реализованный в пакете ClusterManagers.jl
.
Время в секундах, в течение которого запущенный рабочий процесс должен ожидать подключения от главного процесса, можно указать в переменной JULIA_WORKER_TIMEOUT
в среде рабочего процесса. Это применимо, только когда в качестве транспортного протокола используется TCP/IP.
Чтобы рабочие процессы не блокировали REPL или содержащую их функцию при запуске процессов программно, следует выполнять addprocs
в отдельной задаче.
Примеры
# Асинхронный запуск addprocs для занятых кластеров
t = @async addprocs(...)
# Использование рабочих процессов по мере их включения
if nprocs() > 1 # Проверка, что доступен хотя бы один новый процесс
.... # Распределенное выполнение
end
# Получение ИД запущенных рабочих процессов или сообщений об ошибках
if istaskdone(t) # Проверка завершения addprocs, чтобы из-за fetch не возникала блокировка
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> List of process identifiers
Добавляет рабочие процессы на удаленных компьютерах через SSH. Настройка производится с помощью именованных аргументов (см. ниже). В частности, с помощью аргумента exename
можно указать путь к двоичному файлу julia
на удаленных компьютерах.
machines
— это вектор, содержащий характеристики компьютеров в виде строк формата [user@]host[:port] [bind_addr[:port]]
. По умолчанию user
— это текущий пользователь, а port
— стандартный SSH-порт. Если указан элемент [bind_addr[:port]]
, другие рабочие процессы будут подключаться к текущему по указанному адресу bind_addr
через порт port
.
На удаленном узле можно запустить несколько процессов, передав кортеж в векторе machines
в формате (machine_spec, count)
, где count
— это число рабочих процессов, которые нужно запустить на указанном узле. Если в качестве числа рабочих процессов передать :auto
, число рабочих процессов будет равно числу потоков ЦП на удаленном узле.
Примеры:
addprocs([
"remote1", # один рабочий процесс на узле remote1, выполняющий вход с текущим именем пользователя
"user@remote2", # один рабочий процесс на узле remote2, выполняющий вход с именем пользователя user
"user@remote3:2222", # для узла remote3 указывается SSH-порт 2222
("user@remote4", 4), # на узле remote4 запускаются 4 рабочих процесса
("user@remote5", :auto), # на узле remote5 запускается столько рабочих процессов, сколько имеется потоков ЦП
])
Именованные аргументы:
-
tunnel
: при значенииtrue
будет использоваться SSH-туннелирование для подключения к рабочему процессу из главного процесса. Значение по умолчанию —false
. -
multiplex
: при значенииtrue
для SSH-туннелирования используется мультиплексирование SSH. Значение по умолчанию —false
. -
ssh
: имя исполняемого файла SSH-клиента, используемого для запуска рабочих процессов, или путь к этому файлу. Значение по умолчанию —"ssh"
. -
sshflags
: задает дополнительные параметры SSH, например $sshflags=-i /home/foo/bar.pem
$. -
max_parallel
: задает максимальное число рабочих процессов, к которым параллельно осуществляется подключение на узле. По умолчанию — 10. -
shell
: указывает тип оболочки, к которой подключается SSH на рабочих узлах.-
shell=:posix
: оболочка Unix/Linux, совместимая с POSIX (sh, ksh, bash, dash, zsh и т. д.). Используется по умолчанию. -
shell=:csh
: оболочка C shell для Unix (csh, tcsh). -
shell=:wincmd
: Microsoft Windowscmd.exe
.
-
-
dir
: задает рабочий каталог рабочих процессов. По умолчанию — текущий каталог узла (определяемыйpwd()
). -
enable_threaded_blas
: если задано значениеtrue
, то BLAS будет работать в добавляемых процессах в многопоточном режиме. Значение по умолчанию —false
. -
exename
: имя исполняемого файлаjulia
. По умолчанию имеет значение"$(Sys.BINDIR)/julia"
или"$(Sys.BINDIR)/julia-debug"
в зависимости от ситуации. На всех удаленных компьютерах рекомендуется использовать одну и ту же версию Julia, иначе при сериализации и распространении кода могут происходить сбои. -
exeflags
: дополнительные флаги, передаваемые рабочим процессам. -
topology
: задает, как рабочие процессы подключаются друг к другу. Отправка сообщения между неподключенными рабочими процессами вызывает ошибку.-
topology=:all_to_all
: All processes are connected to each other. The default. -
topology=:master_worker
: Only the driver process, i.e.pid
1 connects to the workers. The workers do not connect to each other. -
topology=:custom
: Thelaunch
method of the cluster manager specifies the connection topology via fieldsident
andconnect_idents
inWorkerConfig
. A worker with a cluster manager identityident
will connect to all workers specified inconnect_idents
.
-
-
lazy
: применимо только дляtopology=:all_to_all
. При значенииtrue
соединения между рабочими процессами настраиваются в отложенном режиме, то есть при первом удаленном вызове между рабочими процессами. Значение по умолчанию — true. -
env
: позволяет задать массив из пар строковых значений, таких какenv=["JULIA_DEPOT_PATH"=>"/depot"]
, чтобы запросить о задании переменных среды на удаленном компьютере. По умолчанию только переменная средыJULIA_WORKER_TIMEOUT
автоматически передается из локальной в удаленную среду. -
cmdline_cookie
: передает cookie-файл проверки подлинности с использованием параметра--worker
командной строки. Применяемая по умолчанию (и более безопасная) передача cookie-файла через поток stdio в SSH может зависать при использовании рабочих процессов Windows с устаревшими версиями Julia или Windows (до появления ConPTY). В этом случаеcmdline_cookie=true
может служить как обходное решение.
Совместимость: Julia 1.6
Именованные аргументы |
Переменные среды:
Если главному процессу не удается установить соединение с только что запущенным рабочим процессом в течение 60,0 секунд, рабочий процесс считает это неустранимой ошибкой и завершает работу. Это время ожидания можно контролировать через переменную среды JULIA_WORKER_TIMEOUT
. Значение JULIA_WORKER_TIMEOUT
для главного процесса задает время в секундах, в течение которого только что запущенный рабочий процесс ожидает подключения.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers
Запускает np
рабочих процессов на локальном узле, используя встроенный диспетчер LocalManager
.
Локальные рабочие процессы наследуют текущую пакетную среду (то есть активный проект, LOAD_PATH
и DEPOT_PATH
) от главного процесса.
Именованные аргументы:
-
restrict::Bool
: если имеет значениеtrue
(по умолчанию), привязка ограничена адресом127.0.0.1
. -
dir
,exename
,exeflags
,env
,topology
,lazy
,enable_threaded_blas
: действуют аналогичным образом дляSSHManager
; см. документацию поaddprocs(machines::AbstractVector)
.
Совместимость: Julia 1.9
Наследование пакетной среды и именованный аргумент |
#
Distributed.nprocs
— Function
nprocs()
Получает число доступных процессов.
Примеры
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3
#
Distributed.procs
— Method
procs(pid::Integer)
Возвращает список всех идентификаторов процессов на одном физическом узле, а именно всех рабочих процессов, привязанных к тому же IP-адресу, что у pid
.
#
Distributed.workers
— Function
workers()
Возвращает список всех идентификаторов рабочих процессов.
Примеры
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3
#
Distributed.rmprocs
— Function
rmprocs(pids...; waitfor=typemax(Int))
Удаляет указанные рабочие процессы. Учтите, что только процесс 1 может добавлять или удалять рабочие процессы.
Аргумент waitfor
задает время ожидания отключения рабочих процессов:
-
Если он не указан,
rmprocs
будет ждать до тех пор, пока все запрошенные процессыpids
не будут удалены. -
Будет выдано исключение
ErrorException
, если завершить работу всех процессов за указанное времяwaitfor
невозможно. -
Если
waitfor
имеет значение 0, вызов немедленно возвращает рабочие процессы. запланированные к удалению в другой задаче. Возвращается объектTask
, в котором запланировано удаление. Пользователю следует вызватьwait
для этой задачи, прежде чем начинать какие-либо другие параллельные вызовы.
Примеры
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
#
Distributed.interrupt
— Function
interrupt(pids::Integer...)
Прерывает выполняемую в данный момент задачу в указанных рабочих процессах. Это равноценно нажатию на локальном компьютере клавиш Ctrl-C. При отсутствии аргументов прерываются все рабочие процессы.
interrupt(pids::AbstractVector=workers())
Прерывает выполняемую в данный момент задачу в указанных рабочих процессах. Это равноценно нажатию на локальном компьютере клавиш Ctrl-C. При отсутствии аргументов прерываются все рабочие процессы.
#
Distributed.myid
— Function
myid()
Получает идентификатор текущего процесса.
Примеры
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4
#
Distributed.pmap
— Function
pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
Преобразует коллекцию c
, применяя f
к каждому элементу с использованием доступных рабочих процессов и задач.
Если указано несколько аргументов коллекции, f
применяется поэлементно.
Учтите, что необходимо обеспечить доступность f
для всех рабочих процессов. См. Доступность кода и загрузка пакетов для получения дополнительных сведений.
Если не указать пул рабочих процессов, будут использованы все доступные процессы, т. е. пул по умолчанию.
По умолчанию pmap
распределяет вычисления по всем указанным рабочим процессам. Чтобы использовать только локальные процессы и распределить нагрузку по задачам, укажите distributed=false
. Это равноценно использованию asyncmap
. Например, pmap(f, c; distributed=false)
эквивалентно asyncmap(f,c; ntasks=()->nworkers())
Функция pmap
также может использовать сочетание процессов и задач с помощью аргумента размера пакетов (batch_size
). При размере пакетов больше 1 коллекция обрабатывается в нескольких пакетах, каждый из которых имеет длину batch_size
или меньше. Пакет отправляется в виде одного запроса свободному рабочему процессу, и затем локальная функция asyncmap
обрабатывает элементы пакета с помощью множества параллельных задач.
При возникновении какой-либо ошибки функция pmap
останавливает обработку оставшейся части коллекции. Чтобы этого не происходило, вы можете использовать аргумент on_error
и задать функцию обработки ошибок, которая принимает единственный аргумент, а именно исключение. Эта функция может повторно выдавать ошибку и прерывать обработку либо продолжать работу, возвращая любое значение в inline-режиме вместе с результатами вызывающему объекту.
Рассмотрим следующие два примера кода. В первом примере объект исключения возвращается в inline-режиме, а во втором вместо любого исключения возвращается 0:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
Для обработки ошибок можно также повторять невыполненные вычисления. Именованные аргументы retry_delays
и retry_check
передаются в retry
в качестве именованных аргументов delays
и check
соответственно. Если задана пакетная обработка и происходит сбой всего пакета, все элементы из пакета будут обрабатываться повторно.
Учтите, что если указать и on_error
, и retry_delays
, то сначала вызывается перехватчик on_error
, а затем выполняется повтор. Если on_error
не выдает (или не выдает повторно) исключение, обработка элемента не повторяется.
Пример: повторное выполнение f
для элемента максимум три раза без какой-либо паузы при наличии ошибок.
pmap(f, c; retry_delays = zeros(3))
Пример: повторное выполнение f
только при исключениях, не относящихся к типу InexactError
, с экспоненциальным увеличением задержки максимум три раза, а для всех случаев InexactError
— возврат на месте значения NaN
.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
#
Distributed.RemoteException
— Type
RemoteException(captured)
Фиксирует исключения в удаленных вычислениях и повторно выдает их локально. RemoteException
является оболочкой для идентификатора рабочего процесса (pid
) и записанного исключения. В CapturedException
записывается удаленное исключение и стек вызовов в сериализованной форме на тот момент, когда исключение возникло.
#
Distributed.ProcessExitedException
— Type
ProcessExitedException(worker_id::Int)
После завершения клиентского процесса Julia дальнейшие попытки обращения к недостижимому дочернему объекту будут приводить к выдаче исключения.
#
Distributed.Future
— Type
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
A Future
is a placeholder for a single computation of unknown termination status and time. For multiple potential computations, see RemoteChannel
. See remoteref_id
for identifying an AbstractRemoteRef
.
#
Distributed.RemoteChannel
— Type
RemoteChannel(pid::Integer=myid())
Ссылка на Channel{Any}(1)
для процесса pid
. По умолчанию pid
— это текущий процесс.
RemoteChannel(f::Function, pid::Integer=myid())
Создает ссылки на удаленные каналы определенного размера и типа. Функция f
при выполнении для процесса pid
должна возвращать реализацию AbstractChannel
.
Например, RemoteChannel(()->Channel{Int}(10), pid)
возвращает ссылку на канал типа Int
с размером 10 для процесса pid
.
По умолчанию pid
— это текущий процесс.
#
Base.fetch
— Method
fetch(x::Future)
Wait for and get the value of a Future
. The fetched value is cached locally. Further calls to fetch
on the same reference return the cached value. If the remote value is an exception, throws a RemoteException
which captures the remote exception and backtrace.
#
Base.fetch
— Method
fetch(x::Any)
Возвращает x
.
fetch(c::RemoteChannel)
Ожидает и получает значение из RemoteChannel
. Выдает такие же исключения, как и для Future
. Полученный элемент не удаляется.
#
Distributed.remotecall_wait
— Method
remotecall_wait(f, id::Integer, args...; kwargs...)
Выполняет более быстрое ожидание wait(remotecall(...))
в одном сообщении рабочего процесса (Worker
), задаваемого идентификатором id
. Именованные аргументы (при наличии) передаются функции f
.
См. также описание wait
и remotecall
.
#
Distributed.remotecall_fetch
— Method
remotecall_fetch(f, id::Integer, args...; kwargs...)
Выполняет fetch(remotecall(...))
в одном сообщении. Именованные аргументы (при наличии) передаются функции f
. Любые удаленные исключения фиксируются в RemoteException
и затем выдаются.
См. также описание fetch
и remotecall
.
Примеры
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...
#
Distributed.remote_do
— Method
remote_do(f, id::Integer, args...; kwargs...) -> nothing
Выполняет функцию f
в рабочем процессе id
в асинхронном режиме. В отличие от remotecall
, этот метод не сохраняет результат вычисления, а также не позволяет ожидать завершения.
Успешный вызов показывает, что запрос был принят к выполнению на удаленном узле.
Последовательные вызовы remotecall
для одного рабочего процесса сериализируются в порядке их осуществления, однако порядок выполнения на удаленном узле не определен. Например, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
сериализирует вызовы к f1
, а затем к f2
и f3
в таком порядке. Однако выполнение f1
перед f3
в рабочем процессе 2 не гарантируется.
Выдаваемые f
исключения выводятся в поток stderr
удаленного процесса.
Именованные аргументы (при наличии) передаются функции f
.
#
Base.put!
— Method
put!(rr::RemoteChannel, args...)
Сохраняет набор значений в канале RemoteChannel
. Если канал заполнен, метод блокирует работу, пока не появится доступное пространство. Возвращается первый аргумент.
#
Base.take!
— Method
take!(rr::RemoteChannel, args...)
Получает значения из удаленного канала (RemoteChannel
) rr
, удаляя эти значения в процессе.
#
Base.isready
— Method
isready(rr::RemoteChannel, args...)
Определяет, имеет ли RemoteChannel
значение, хранимое в нем. Учтите, что эта функция может вызывать состояние гонки, так как к моменту получения результата он может уже не соответствовать истине. Однако ее можно безопасно использовать с объектами Future
, так как значения им присваиваются только один раз.
#
Base.isready
— Method
isready(rr::Future)
Determine whether a Future
has a value stored to it.
If the argument Future
is owned by a different node, this call will block to wait for the answer. It is recommended to wait for rr
in a separate task instead or to use a local Channel
as a proxy:
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # will not block
#
Distributed.AbstractWorkerPool
— Type
AbstractWorkerPool
Супертип для пулов рабочих процессов, таких как WorkerPool
и CachingPool
. Для пула AbstractWorkerPool
необходимо реализовать следующее:
-
push!
— добавляет новый рабочий процесс в общий пул (доступные + занятые). -
put!
— возвращает рабочий процесс в пул доступных. -
take!
— забирает рабочий процесс из пула доступных (чтобы использовать его для удаленного выполнения функций). -
length
— число доступных рабочих процессов в общем пуле. -
isready
— возвращает значение false, если использованиеtake!
в пуле приведет к блокировке; в противном случае возвращает true.
Используемые по умолчанию реализации вышеуказанных методов (для AbstractWorkerPool
) требуют указания следующих полей:
-
channel::Channel{Int}
-
workers::Set{Int}
Поле channel
содержит идентификаторы свободных рабочих процессов, а workers
— набор всех процессов, связанных с данным пулом.
#
Distributed.WorkerPool
— Type
WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
Создает пул WorkerPool
на основе вектора или диапазона идентификаторов рабочих процессов.
Примеры
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
#
Distributed.CachingPool
— Type
CachingPool(workers::Vector{Int})
Представляет собой реализацию AbstractWorkerPool
. remote
, remotecall_fetch
, pmap
(и другие удаленные вызовы для удаленного выполнения функций) работают эффективнее при кэшировании сериализируемых и десериализируемых функций на рабочих узлах, особенно при использовании замыканий (способных записывать большой объем данных).
Удаленный кэш поддерживается в течение всего времени существования возвращаемого объекта CachingPool
. Для более ранней очистки кэша используйте clear!(pool)
.
При использовании глобальных переменных в замыкании записываются только привязки, а не сами данные. Для записи глобальных данных можно использовать блоки let
.
Примеры
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end
В примере выше foo
передается каждому рабочему процессу только один раз.
#
Distributed.default_worker_pool
— Function
default_worker_pool()
Пул AbstractWorkerPool
, содержащий неактивные рабочие процессы (workers
) — используется функциями remote(f)
и pmap
(по умолчанию). Если пул рабочих процессов по умолчанию не задан явно с помощью default_worker_pool!(pool)
, в качестве пула по умолчанию будет инициализирован WorkerPool
.
Примеры
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
#
Distributed.clear!
— Method
clear!(pool::CachingPool) -> pool
Удаляет все кэшированные функции для всех участвующих рабочих процессов.
#
Distributed.remote
— Function
remote([p::AbstractWorkerPool], f) -> Function
Возвращает анонимную функцию, которая выполняет функцию f
в доступном рабочем процессе (взятом из пула p
типа WorkerPool
, если такой пул предоставлен) с использованием remotecall_fetch
.
#
Distributed.remotecall
— Method
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
Вариант remotecall(f, pid, ....)
с использованием пула WorkerPool
. Ожидает и забирает из pool
свободный рабочий процесс, а затем выполняет в нем remotecall
.
Примеры
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
В этом примере задача выполнялась в процессе pid 2, вызванном из pid 1.
#
Distributed.remotecall_wait
— Method
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
Вариант remotecall_wait(f, pid, ....)
с использованием пула WorkerPool
. Ожидает и забирает из pool
свободный рабочий процесс, а затем выполняет в нем remotecall_wait
.
Примеры
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
#
Distributed.remotecall_fetch
— Method
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
Вариант remotecall_fetch(f, pid, ....)
с использованием пула WorkerPool
. Ожидает и забирает из pool
свободный рабочий процесс, а затем выполняет в нем remotecall_fetch
.
Примеры
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
#
Distributed.remote_do
— Method
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
Вариант remote_do(f, pid, ....)
с использованием пула WorkerPool
. Ожидает и забирает из pool
свободный рабочий процесс, а затем выполняет в нем remote_do
.
#
Distributed.@spawnat
— Macro
@spawnat p expr
Создает вокруг выражения замыкание и выполняет это замыкание асинхронно в процессе p
. Возвращает Future
в качестве результата. Если указать для p
в кавычках символьный литерал :any
, система выберет используемый обработчик автоматически.
Примеры
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
Совместимость: Julia 1.3
Аргумент |
#
Distributed.@distributed
— Macro
@distributed
Распределяет в параллельном режиме память для цикла в следующей форме:
@distributed [reducer] for var = range body end
Указанный диапазон разбивается на секции и используется для локального выполнения во всех рабочих процессах. Если задана функция-редуктор (reducer), то @distributed
выполняет локальные преобразования для каждого рабочего процесса и итоговое преобразование для вызывающего процесса.
Учтите, что без функции-редуктора макрос @distributed
выполняется асинхронно, т. е. порождает независимые задачи во всех рабочих процессах и немедленно возвращает результат, не ожидая завершения. Чтобы дождаться завершения, поставьте перед вызовом префикс @sync
, например:
@sync @distributed for var = range body end
#
Distributed.@everywhere
— Macro
@everywhere [procs()] expr
Выполняет выражение в Main
во всех процессах procs
. Возникающие в процессах ошибки собираются в исключение CompositeException
и затем выдаются. Пример:
@everywhere bar = 1
Это выражение определяет Main.bar
для всех текущих процессов. Для добавляемых позже процессов (например, с помощью addprocs()
) это выражение определено не будет.
В отличие от @spawnat
, макрос @everywhere
не записывает никакие локальные переменные. Вместо этого их можно транслировать с использованием интерполяции:
foo = 1 @everywhere bar = $foo
Необязательный аргумент procs
позволяет задать подмножество всех процессов, которым необходимо выполнить выражение.
Это эквивалентно вызову remotecall_eval(Main, procs, expr)
с двумя дополнительными особенностями:
- Сначала выполняются операторы `using` и `import` для вызывающего процесса, чтобы обеспечить предварительную компиляцию пакетов. - Текущий путь к файлу исходного кода, используемый `include`, распространяется на другие процессы.
#
Distributed.clear!
— Method
clear!(syms, pids=workers(); mod=Main)
Очищает в модулях глобальные привязки, инициализируя их как nothing
. Параметр syms
должен быть значением типа Symbol
или коллекцией значений Symbol
. pids
и mod
указывают процессы и модуль, в которых глобальные переменные будут инициализированы повторно. Будут очищены только имена, определения которых обнаружены в mod
.
При запросе об очистке глобальной константы будет выдано исключение.
#
Distributed.remoteref_id
— Function
remoteref_id(r::AbstractRemoteRef) -> RRID
Объекты Future
и RemoteChannel
можно идентифицировать по следующим полям:
-
where
— узел, где фактически существует используемый объект или хранилище, указываемые по ссылке. -
whence
— узел, с которого была создана удаленная ссылка. Обратите внимание, что это не тот узел, на котором фактически существует указываемый по ссылке объект. Например, при вызовеRemoteChannel(2)
из главного процесса мы получим дляwhere
значение 2, а дляwhence
— 1. -
id
— уникальный идентификатор среди всех ссылок, созданных с узла рабочего процесса, указанного вwhence
.
В совокупности whence
и id
уникальным образом идентифицируют ссылку среди всех рабочих процессов.
Функция remoteref_id
представляет собой низкоуровневый API-интерфейс, который возвращает объект RRID
, являющийся оболочкой для значений whence
и id
удаленной ссылки.
#
Distributed.channel_from_id
— Function
channel_from_id(id) -> c
Низкоуровневый API-интерфейс, который возвращает вспомогательный канал AbstractChannel
для идентификатора id
, возвращаемого функцией remoteref_id
. Этот вызов действителен только на узле, где вспомогательный канал существует.
#
Distributed.worker_id_from_socket
— Function
worker_id_from_socket(s) -> pid
Низкоуровневый API-интерфейс, который принимает IO
-подключение или рабочий процесс Worker
и возвращает идентификатор pid
подключенного к ним рабочего процесса. Это бывает полезно при написании для типа особых методов serialize
, которые оптимизируют выводимые данные в зависимости от идентификатора принимающего процесса.
cluster_cookie() -> cookie
Возвращает cookie-файл кластера.
cluster_cookie(cookie) -> cookie
Задает принимаемый cookie-файл в качестве cookie кластера и возвращает этот файл.
Интерфейс диспетчера кластера
Этот интерфейс предоставляет механизм для запуска рабочих процессов Julia и управления ими в различных кластерных средах. В модуле Base есть диспетчеры двух типов: LocalManager
для запуска дополнительных рабочих процессов на локальном узле и SSHManager
для запуска на удаленных узлах посредством ssh
. Для подключения и передачи сообщений между процессами применяются сокеты TCP/IP. Диспетчеры кластеров могут предоставлять другой механизм транспорта.
#
Distributed.ClusterManager
— Type
ClusterManager
Супертип для диспетчеров кластеров, которые управляют рабочими процессами как кластером. Диспетчеры кластеров реализуют способы добавления, удаления и взаимодействия для рабочих процессов. Подтипами данного типа являются SSHManager
и LocalManager
.
#
Distributed.WorkerConfig
— Type
WorkerConfig
Тип, используемый диспетчерами кластеров (ClusterManager
) для управления добавляемыми в кластеры рабочими процессами. Некоторые поля используются всеми диспетчерами кластеров для доступа к узлу:
-
io
— подключение, используемое для доступа к рабочему процессу (подтипIO
илиNothing
). -
host
— адрес узла (String
либоNothing
). -
port
— используемый на узле порт для подключения к рабочему процессу (Int
либоNothing
).
Некоторые поля используются диспетчером кластеров для добавления рабочих процессов к уже инициализированному узлу:
-
count
— число рабочих процессов, которые нужно запустить на узле. -
exename
— путь к исполняемому файлу Julia на узле (по умолчанию"$(Sys.BINDIR)/julia"
) или"$(Sys.BINDIR)/julia-debug"
-
exeflags
— флаги, используемые при удаленном запуске Julia.
Поле userdata
используется внешними диспетчерами для хранения данных о каждом рабочем процессе.
Некоторые поля используются SSHManager
и подобными диспетчерами:
-
tunnel
— может иметь значениеtrue
(использовать туннелирование),false
(не использовать туннелирование) илиnothing
(использовать режим по умолчанию диспетчера). -
multiplex
— может иметь значениеtrue
(использовать для туннелирования мультиплексирование SSH) илиfalse
. -
forward
— параметр переадресации, используемый для параметра-L
в SSH. -
bind_addr
— адрес для привязки на удаленном узле. -
sshflags
— флаги, используемые при установке SSH-соединения. -
max_parallel
— максимальное число рабочих процессов, к которым параллельно осуществляется подключение на узле.
Некоторые поля используются как локальными диспетчерами (LocalManager
), так и диспетчерами SSHManager
:
-
connect_at
— определяет, используется ли данный вызов для настройки подключения между рабочими процессами или между рабочим процессом и главным. -
process
— процесс, который будет подключен (как правило, диспетчер назначает его при выполненииaddprocs
). -
ospid
— идентификатор процесса согласно ОС узла, используемый для прерывания рабочих процессов. -
environ
— закрытый словарь, используемый локальными диспетчерами или диспетчерами SSH для хранения временных данных. -
ident
— рабочий процесс, определяемый диспетчером кластеров (ClusterManager
). -
connect_idents
— список идентификаторов процессов, к которым должен подключаться рабочий процесс при использовании настраиваемой топологии. -
enable_threaded_blas
— задает для рабочих процессов использование BLAS в многопоточном режиме (возможные значения:true
,false
илиnothing
).
#
Distributed.launch
— Function
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
Реализуется в диспетчерах кластеров. Для каждого запускаемого этой функцией рабочего процесса Julia она должна добавлять в массив launched
запись WorkerConfig
и отправлять уведомление launch_ntfy
. Функция ОБЯЗАТЕЛЬНО должна завершать работу, когда все запрошенные диспетчером manager
функции будут запущены. params
— это словарь всех именованных аргументов, с которыми вызывалась функция addprocs
.
#
Distributed.manage
— Function
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
Реализуется в диспетчерах кластеров. Эта функция вызывается в главном процессе во время существования рабочего процесса с необходимыми значениями op
:
-
Со значением
:register
или:deregister
при добавлении или удалении процесса в пуле рабочих процессов Julia. -
Со значением
:interrupt
при вызовеinterrupt(workers)
. ДиспетчерClusterManager
должен отправить соответствующему рабочему процессу сигнал о прерывании. -
Со значением
:finalize
для целей очистки.
#
Base.kill
— Method
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
Реализуется в диспетчерах кластеров. Этот метод вызывается rmprocs
в главном процессе. Он должен завершать работу удаленного процесса, задаваемого pid
. Метод kill(manager::ClusterManager.....)
удаленно выполняет exit()
для pid
.
#
Sockets.connect
— Method
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
Реализуется в диспетчерах кластеров, использующих особые транспортные протоколы. Метод должен устанавливать логическое подключение к рабочему процессу с идентификатором pid
, задаваемое в config
, и возвращать пару объектов IO
(ввода-вывода). Сообщения от pid
к текущему процессу будут считываться из потока instrm
, а сообщения, отправляемые процессу pid
, — записываться в outstrm
. Реализация особого транспортного протокола должна обеспечивать доставку и получение сообщений полностью и в правильном порядке. Метод connect(manager::ClusterManager.....)
настраивает подключение сокетов TCP/IP между рабочими процессами.
#
Distributed.init_worker
— Function
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
Вызывается диспетчерами кластеров, реализующими особые транспортные протоколы. Эта функция инициализирует только что запущенный процесс в качестве рабочего процесса. Аргумент командной строки --worker[=<cookie>]
обеспечивает инициализацию процесса как рабочего с использованием в качестве транспортного протокола сокетов TCP/IP. В качестве cookie
используется файл cluster_cookie
.
#
Distributed.start_worker
— Function
start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
start_worker
— это внутренняя функция, являющаяся точкой входа по умолчанию для рабочих процессов, подключающихся через TCP/IP. Она настраивает процесс в качестве рабочего процесса кластера Julia.
Данные host:port записываются в поток out
(по умолчанию stdout).
При необходимости функция считывает cookie-файл из stdin, ожидает передачи данных на незанятом порту (либо на порту, который задается параметром командной строки --bind-to
) и планирует выполнение задач по обработке входящих TCP-подключений и запросов. Эта функция также может (необязательно) закрывать stdin и перенаправлять поток stderr в stdout.
Она не возвращает никаких значений.
#
Distributed.process_messages
— Function
process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
Вызывается диспетчерами кластеров, использующими особые транспортные протоколы. Эта функция должна вызываться, когда особая реализация транспортного протокола получает первое сообщение от удаленного рабочего процесса. Настроенный протокол должен обеспечить логическое подключение к удаленному процессу и предоставить два объекта IO
: один для входящих сообщений, а другой для сообщений, адресуемых удаленному процессу. Если incoming
имеет значение true
, соединение инициировано удаленным одноранговым узлом. Тот узел в паре, который инициировал соединение, передает cookie-файл кластера и свой номер версии Julia для согласования подключения и проверки подлинности.
См. также описание cluster_cookie
.
#
Distributed.default_addprocs_params
— Function
default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
Реализуется в диспетчерах кластеров. Именованные параметры по умолчанию, передаваемые при вызове addprocs(mgr)
. Минимальный набор параметров доступен при вызове default_addprocs_params()