Распределенные вычисления
#
Distributed
— Module
Инструменты для реализации распределенных вычислений.
#
Distributed.addprocs
— Function
addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers
Запускает рабочие процессы через указанный диспетчер кластеров.
Например, кластеры Beowulf поддерживаются через специализированный диспетчер, реализованный в пакете ClusterManagers.jl
.
Время в секундах, в течение которого запущенный рабочий процесс должен ожидать подключения от главного процесса, можно указать в переменной JULIA_WORKER_TIMEOUT
в среде рабочего процесса. Это применимо, только когда в качестве транспортного протокола используется TCP/IP.
Чтобы рабочие процессы не блокировали REPL или содержащую их функцию при запуске процессов программно, следует выполнять addprocs
в отдельной задаче.
Примеры
# Асинхронный запуск addprocs для занятых кластеров
t = @async addprocs(...)
# Использование рабочих процессов по мере их включения
if nprocs() > 1 # Проверка, что доступен хотя бы один новый процесс
.... # Распределенное выполнение
end
# Получение ИД запущенных рабочих процессов или сообщений об ошибках
if istaskdone(t) # Проверка завершения addprocs, чтобы из-за fetch не возникала блокировка
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> List of process identifiers
Добавляет рабочие процессы на удаленных компьютерах через SSH. Настройка производится с помощью именованных аргументов (см. ниже). В частности, с помощью аргумента exename
можно указать путь к двоичному файлу julia
на удаленных компьютерах.
machines
— это вектор, содержащий характеристики компьютеров в виде строк формата [user@]host[:port] [bind_addr[:port]]
. По умолчанию user
— это текущий пользователь, а port
— стандартный SSH-порт. Если указан элемент [bind_addr[:port]]
, другие рабочие процессы будут подключаться к текущему по указанному адресу bind_addr
через порт port
.
На удаленном узле можно запустить несколько процессов, передав кортеж в векторе machines
в формате (machine_spec, count)
, где count
— это число рабочих процессов, которые нужно запустить на указанном узле. Если в качестве числа рабочих процессов передать :auto
, число рабочих процессов будет равно числу потоков ЦП на удаленном узле.
Примеры:
addprocs([
"remote1", # один рабочий процесс на узле remote1, выполняющий вход с текущим именем пользователя
"user@remote2", # один рабочий процесс на узле remote2, выполняющий вход с именем пользователя user
"user@remote3:2222", # для узла remote3 указывается SSH-порт 2222
("user@remote4", 4), # на узле remote4 запускаются 4 рабочих процесса
("user@remote5", :auto), # на узле remote5 запускается столько рабочих процессов, сколько имеется потоков ЦП
])
Именованные аргументы:
-
tunnel
: при значенииtrue
будет использоваться SSH-туннелирование для подключения к рабочему процессу из главного процесса. Значение по умолчанию —false
. -
multiplex
: при значенииtrue
для SSH-туннелирования используется мультиплексирование SSH. Значение по умолчанию —false
. -
ssh
: имя исполняемого файла SSH-клиента, используемого для запуска рабочих процессов, или путь к этому файлу. Значение по умолчанию —"ssh"
. -
sshflags
: задает дополнительные параметры SSH, например $sshflags=-i /home/foo/bar.pem
$. -
max_parallel
: задает максимальное число рабочих процессов, к которым параллельно осуществляется подключение на узле. По умолчанию — 10. -
shell
: указывает тип оболочки, к которой подключается SSH на рабочих узлах.-
shell=:posix
: оболочка Unix/Linux, совместимая с POSIX (sh, ksh, bash, dash, zsh и т. д.). Используется по умолчанию. -
shell=:csh
: оболочка C для Unix (csh, tcsh). -
shell=:wincmd
:cmd.exe
системы Microsoft Windows.
-
-
dir
: задает рабочий каталог рабочих процессов. По умолчанию — текущий каталог узла (определяемыйpwd()
). -
enable_threaded_blas
: если задано значениеtrue
, то BLAS будет работать в добавляемых процессах в многопоточном режиме. Значение по умолчанию —false
. -
exename
: имя исполняемого файлаjulia
. По умолчанию имеет значение"$(Sys.BINDIR)/julia"
или"$(Sys.BINDIR)/julia-debug"
в зависимости от ситуации. На всех удаленных компьютерах рекомендуется использовать одну и ту же версию Julia, иначе при сериализации и распространении кода могут происходить сбои. -
exeflags
: дополнительные флаги, передаваемые рабочим процессам. -
topology
: задает, как рабочие процессы подключаются друг к другу. Отправка сообщения между неподключенными рабочими процессами вызывает ошибку.-
topology=:all_to_all
: все процессы подключаются друг к другу. Используется по умолчанию. -
topology=:master_worker
: только главный процесс, то есть процесс сpid
1, подключается к рабочим процессам. Рабочие процессы не подключаются друг к другу. -
topology=:custom
: методlaunch
диспетчера кластера задает топологию подключения посредством полейident
иconnect_idents
вWorkerConfig
. Рабочий процесс с идентификатором диспетчера кластераident
подключается к всем рабочим процессам, указанным вconnect_idents
.
-
-
lazy
: применимо только дляtopology=:all_to_all
. При значенииtrue
соединения между рабочими процессами настраиваются в отложенном режиме, то есть при первом удаленном вызове между рабочими процессами. Значение по умолчанию — true. -
env
: позволяет задать массив из пар строковых значений, таких какenv=["JULIA_DEPOT_PATH"=>"/depot"]
, чтобы запросить задание переменных среды на удаленном компьютере. По умолчанию только переменная средыJULIA_WORKER_TIMEOUT
автоматически передается из локальной в удаленную среду. -
cmdline_cookie
: передает cookie-файл проверки подлинности с использованием параметра--worker
командной строки. Применяемая по умолчанию (и более безопасная) передача cookie-файла через поток stdio в SSH может зависать при использовании рабочих процессов Windows с устаревшими версиями Julia или Windows (до появления ConPTY). В этом случаеcmdline_cookie=true
может служить обходным решением.
Совместимость: Julia 1.6
Именованные аргументы |
Переменные среды:
Если главному процессу не удается установить соединение с только что запущенным рабочим процессом в течение 60,0 секунд, рабочий процесс считает это неустранимой ошибкой и завершает работу. Это время ожидания можно контролировать через переменную среды JULIA_WORKER_TIMEOUT
. Значение JULIA_WORKER_TIMEOUT
для главного процесса задает время в секундах, в течение которого только что запущенный рабочий процесс ожидает подключения.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers
Запускает np
рабочих процессов на локальном узле, используя встроенный диспетчер LocalManager
.
Локальные рабочие процессы наследуют текущую пакетную среду (то есть активный проект, LOAD_PATH
и DEPOT_PATH
) от главного процесса.
Обратите внимание, что рабочие процессы не выполняют скрипт запуска |
Именованные аргументы:
-
restrict::Bool
: если имеет значениеtrue
(по умолчанию), привязка ограничена адресом127.0.0.1
. -
dir
,exename
,exeflags
,env
,topology
,lazy
,enable_threaded_blas
: действуют аналогичным образом дляSSHManager
; см. документацию поaddprocs(machines::AbstractVector)
.
Совместимость: Julia 1.9
Наследование пакетной среды и именованный аргумент |
#
Distributed.nprocs
— Function
nprocs()
Получает число доступных процессов.
Примеры
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3
#
Distributed.procs
— Method
procs(pid::Integer)
Возвращает список всех идентификаторов процессов на одном физическом узле, а именно всех рабочих процессов, привязанных к тому же IP-адресу, что у pid
.
#
Distributed.workers
— Function
workers()
Возвращает список всех идентификаторов рабочих процессов.
Примеры
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3
#
Distributed.rmprocs
— Function
rmprocs(pids...; waitfor=typemax(Int))
Удаляет указанные рабочие процессы. Учтите, что только процесс 1 может добавлять или удалять рабочими процессами.
Аргумент waitfor
задает время ожидания отключения рабочих процессов:
-
Если он не указан,
rmprocs
будет ждать до тех пор, пока все запрошенные процессыpids
не будут удалены. -
Если завершить работу всех процессов за указанное время
waitfor
секунд невозможно, будет выдано исключениеErrorException
. -
Если
waitfor
имеет значение 0, вызов немедленно возвращает рабочие процессы, запланированные к удалению в другой задаче. Возвращается объектTask
, в котором запланировано удаление. Пользователю следует вызватьwait
для этой задачи, прежде чем начинать какие-либо другие параллельные вызовы.
Примеры
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
#
Distributed.interrupt
— Function
interrupt(pids::Integer...)
Прерывает выполняемую в данный момент задачу в указанных рабочих процессах. Это равноценно нажатию на локальном компьютере клавиш Ctrl-C. При отсутствии аргументов прерываются все рабочие процессы.
interrupt(pids::AbstractVector=workers())
Прерывает выполняемую в данный момент задачу в указанных рабочих процессах. Это равноценно нажатию на локальном компьютере клавиш Ctrl-C. При отсутствии аргументов прерываются все рабочие процессы.
#
Distributed.myid
— Function
myid()
Получает идентификатор текущего процесса.
Примеры
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4
#
Distributed.pmap
— Function
pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
Преобразует коллекцию c
, применяя f
к каждому элементу с использованием доступных рабочих процессов и задач.
Если указано несколько аргументов коллекции, f
применяется поэлементно.
Учтите, что необходимо обеспечить доступность f
для всех рабочих процессов. Дополнительные сведения см. в разделе Доступность кода и загрузка пакетов.
Если не указать пул рабочих процессов, будут использованы все доступные процессы, т. е. пул CachingPool
.
По умолчанию pmap
распределяет вычисления по всем указанным рабочим процессам. Чтобы использовать только локальные процессы и распределить нагрузку по задачам, укажите distributed=false
. Это равноценно использованию asyncmap
. Например, вызов pmap(f, c; distributed=false)
эквивалентен asyncmap(f,c; ntasks=()->nworkers())
.
Функция pmap
также может использовать сочетание процессов и задач с помощью аргумента размера пакетов (batch_size
). При размере пакетов больше 1 коллекция обрабатывается в нескольких пакетах, каждый из которых имеет длину batch_size
или меньше. Пакет отправляется в виде одного запроса свободному рабочему процессу, и затем локальная функция asyncmap
обрабатывает элементы пакета с помощью множества параллельных задач.
При возникновении какой-либо ошибки функция pmap
останавливает обработку оставшейся части коллекции. Чтобы этого не происходило, вы можете использовать аргумент on_error
и задать функцию обработки ошибок, которая принимает единственный аргумент, а именно исключение. Эта функция может повторно выдавать ошибку и прерывать обработку либо продолжать работу, возвращая любое значение в inline-режиме вместе с результатами вызывающему объекту.
Рассмотрим следующие два примера кода. В первом примере объект исключения возвращается в inline-режиме, а во втором вместо любого исключения возвращается 0:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
Для обработки ошибок можно также повторять невыполненные вычисления. Именованные аргументы retry_delays
и retry_check
передаются в retry
в качестве именованных аргументов delays
и check
соответственно. Если задана пакетная обработка и происходит сбой всего пакета, все элементы из пакета будут обрабатываться повторно.
Учтите, что если указать и on_error
, и retry_delays
, то сначала вызывается перехватчик on_error
, а затем выполняется повтор. Если on_error
не выдает (или не выдает повторно) исключение, обработка элемента не повторяется.
Пример: повторное выполнение f
для элемента максимум три раза без какой-либо паузы при наличии ошибок.
pmap(f, c; retry_delays = zeros(3))
Пример: Повторное выполнение f
только при исключениях, не относящихся к типу InexactError
, с экспоненциальным увеличением задержки максимум три раза, а для всех случаев InexactError
— возврат на месте значения NaN
.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
#
Distributed.RemoteException
— Type
RemoteException(captured)
Фиксирует исключения в удаленных вычислениях и повторно выдает их локально. RemoteException
является оболочкой для идентификатора рабочего процесса (pid
) и записанного исключения. В CapturedException
записывается удаленное исключение и стек вызовов в сериализованной форме на тот момент, когда исключение возникло.
#
Distributed.ProcessExitedException
— Type
ProcessExitedException(worker_id::Int)
После завершения клиентского процесса Julia дальнейшие попытки обращения к недостижимому дочернему объекту будут приводить к выдаче исключения.
#
Distributed.Future
— Type
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
Тип Future
служит для хранения одного вычисления с неизвестным состоянием и временем завершения. Для множества возможных вычислений см. RemoteChannel
. Для идентификации AbstractRemoteRef
см. remoteref_id
.
#
Distributed.RemoteChannel
— Type
RemoteChannel(pid::Integer=myid())
Создает ссылку на Channel{Any}(1)
для процесса pid
. По умолчанию pid
— это текущий процесс.
RemoteChannel(f::Function, pid::Integer=myid())
Создает ссылки на удаленные каналы определенного размера и типа. f
— это функция, которая при выполнении для процесса pid
должна возвращать реализацию AbstractChannel
.
Например, RemoteChannel(()->Channel{Int}(10), pid)
возвращает ссылку на канал типа Int
с размером 10 для процесса pid
.
По умолчанию pid
— это текущий процесс.
#
Base.fetch
— Method
fetch(x::Future)
Ожидает и получает значение типа Future
. Полученное значение помещается в локальный кэш. Последующие вызовы fetch
по той же ссылке будут возвращать кэшированное значение. Если удаленное значение соответствует исключению, то выдается исключение RemoteException
, в котором записаны удаленное исключение и обратная трассировка.
#
Base.fetch
— Method
fetch(c::RemoteChannel)
Ожидает и получает значение из RemoteChannel
. Выдает такие же исключения, как и для Future
. Полученный элемент не удаляется.
fetch(x::Any)
Возвращает x
.
#
Distributed.remotecall_wait
— Method
remotecall_wait(f, id::Integer, args...; kwargs...)
Выполняет более быстрое ожидание wait(remotecall(...))
в одном сообщении рабочего процесса (Worker
), задаваемого идентификатором id
. Именованные аргументы (при наличии) передаются функции f
.
См. также описание wait
и remotecall
.
#
Distributed.remotecall_fetch
— Method
remotecall_fetch(f, id::Integer, args...; kwargs...)
Выполняет fetch(remotecall(...))
в одном сообщении. Именованные аргументы (при наличии) передаются функции f
. Любые удаленные исключения фиксируются в RemoteException
и затем выдаются.
См. также описание fetch
и remotecall
.
Примеры
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt was called with a negative real argument but will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...
#
Distributed.remote_do
— Method
remote_do(f, id::Integer, args...; kwargs...) -> nothing
Выполняет функцию f
в рабочем процессе id
в асинхронном режиме. В отличие от remotecall
, этот метод не сохраняет результат вычисления, а также не позволяет ожидать завершения.
Успешный вызов показывает, что запрос был принят к выполнению на удаленном узле.
Последовательные вызовы remotecall
для одного рабочего процесса сериализируются в порядке их осуществления, однако порядок выполнения на удаленном узле не определен. Например, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
сериализирует вызовы f1
, а затем f2
и f3
в таком порядке. Однако выполнение f1
перед f3
в рабочем процессе 2 не гарантируется.
Выдаваемые f
исключения выводятся в поток stderr
удаленного процесса.
Именованные аргументы (при наличии) передаются функции f
.
#
Base.put!
— Method
put!(rr::RemoteChannel, args...)
Сохраняет набор значений в канале RemoteChannel
. Если канал заполнен, метод блокирует работу, пока не появится доступное пространство. Возвращается первый аргумент.
#
Base.put!
— Method
put!(rr::Future, v)
Сохраняет значение в объект rr
типа Future
. Объекты Future
представляют собой однократно записываемые удаленные ссылки. При использовании put!
для уже заданного Future
выдается исключение (Exception
). Все асинхронные удаленные вызовы возвращают объекты Future
и присваивают им возвращаемые значения вызова по его завершении.
#
Base.take!
— Method
take!(rr::RemoteChannel, args...)
Получает значения из удаленного канала RemoteChannel
rr
, удаляя эти значения в процессе.
#
Base.isready
— Method
isready(rr::RemoteChannel, args...)
Определяет, имеет ли RemoteChannel
значение, хранимое в нем. Учтите, что эта функция может вызывать состояние гонки, так как к моменту получения результата он может уже не соответствовать истине. Однако ее можно безопасно использовать с объектами Future
, так как значения им присваиваются только один раз.
#
Base.isready
— Method
isready(rr::Future)
Определяет, имеет ли Future
значение, хранимое в нем.
Если аргумент Future
принадлежит другому узлу, вызов блокирует работу, ожидая ответа. Вместо этого рекомендуется ожидать rr
в отдельной задаче или использовать промежуточный локальный канал (Channel
):
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # блокировки не будет.
#
Distributed.AbstractWorkerPool
— Type
AbstractWorkerPool
Супертип для пулов рабочих процессов, таких как WorkerPool
и CachingPool
. Для пула AbstractWorkerPool
необходимо реализовать следующее:
-
push!
- add a new worker to the overall pool (available + busy) -
put!
- put back a worker to the available pool -
take!
- take a worker from the available pool (to be used for remote function execution) -
length
- number of workers available in the overall pool -
isready
- return false if atake!
on the pool would block, else true
Используемые по умолчанию реализации вышеуказанных методов (для AbstractWorkerPool
) требуют указания следующих полей:
-
channel::Channel{Int}
-
workers::Set{Int}
, где полеchannel
содержит идентификаторы свободных рабочих процессов, аworkers
— набор всех процессов, связанных с данным пулом.
#
Distributed.WorkerPool
— Type
WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
Создает пул WorkerPool
на основе вектора или диапазона идентификаторов рабочих процессов.
Примеры
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
#
Distributed.CachingPool
— Type
CachingPool(workers::Vector{Int})
Представляет собой реализацию AbstractWorkerPool
. remote
, remotecall_fetch
, pmap
(и другие удаленные вызовы для удаленного выполнения функций) работают эффективнее при кэшировании сериализируемых и десериализируемых функций в рабочих узлах, особенно при использовании замыканий (способных записывать большой объем данных).
Удаленный кэш поддерживается в течение всего времени существования возвращаемого объекта CachingPool
. Для более ранней очистки кэша используйте clear!(pool)
.
При использовании глобальных переменных в замыкании записываются только привязки, а не сами данные. Для записи глобальных данных можно использовать блоки let
.
Примеры
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end
В примере выше foo
передается каждому рабочему процессу только один раз.
#
Distributed.default_worker_pool
— Function
default_worker_pool()
Пул AbstractWorkerPool
, содержащий неактивные рабочие процессы workers
и используемый для remote(f)
и pmap
(по умолчанию). Если пул рабочих процессов по умолчанию не задан явно с помощью default_worker_pool!(pool)
, в качестве пула по умолчанию будет инициализирован WorkerPool
.
Примеры
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
#
Distributed.clear!
— Function
clear!(syms, pids=workers(); mod=Main)
Очищает в модулях глобальные привязки, инициализируя их как nothing
. Параметр syms
должен быть значением типа Symbol
или коллекцией значений Symbol
. pids
и mod
указывают процессы и модуль, в которых глобальные переменные будут инициализированы повторно. Будут очищены только имена, определения которых обнаружены в mod
.
При запросе об очистке глобальной константы будет выдано исключение.
clear!(pool::CachingPool) -> pool
Удаляет все кэшированные функции для всех участвующих рабочих процессов.
#
Distributed.remote
— Function
remote([p::AbstractWorkerPool], f) -> Function
Возвращает анонимную функцию, которая выполняет функцию f
в доступном рабочем процессе (взятом из пула p
типа WorkerPool
, если такой пул предоставлен) с использованием remotecall_fetch
.
#
Distributed.remotecall
— Method
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
Вариант remotecall(f, pid, ....)
с использованием пула WorkerPool
. Ожидает и забирает из pool
свободный рабочий процесс, а затем выполняет в нем remotecall
.
Примеры
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
В этом примере задача выполнялась в процессе pid 2, вызванном из pid 1.
#
Distributed.remotecall_wait
— Method
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
Вариант remotecall_wait(f, pid, ....)
с использованием пула WorkerPool
. Ожидает и забирает из pool
свободный рабочий процесс, а затем выполняет в нем remotecall_wait
.
Примеры
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
#
Distributed.remotecall_fetch
— Method
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
Вариант remotecall_fetch(f, pid, ....)
с использованием пула WorkerPool
. Ожидает и забирает из pool
свободный рабочий процесс, а затем выполняет в нем remotecall_fetch
.
Примеры
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
#
Distributed.remote_do
— Method
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
Вариант remote_do(f, pid, ....)
с использованием пула WorkerPool
. Ожидает и забирает из pool
свободный рабочий процесс, а затем выполняет в нем remote_do
.
#
Distributed.@spawn
— Macro
@spawn expr
Создает вокруг выражения замыкание и выполняет это замыкание в автоматически выбранном процессе, возвращая Future
в качестве результата. Этот макрос устарел; вместо него следует использовать @spawnat :any expr
.
Примеры
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
Совместимость: Julia 1.3
Начиная с версии Julia 1.3 этот макрос устарел. Используйте вместо него |
#
Distributed.@spawnat
— Macro
@spawnat p expr
Создает вокруг выражения замыкание и выполняет это замыкание асинхронно в процессе p
. Возвращает Future
в качестве результата. Если указать для p
в кавычках символьный литерал :any
, система выберет используемый обработчик автоматически.
Примеры
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
Совместимость: Julia 1.3
Аргумент |
#
Distributed.@distributed
— Macro
@distributed
Распределяет в параллельном режиме память для цикла в следующей форме:
@distributed [reducer] for var = range body end
Указанный диапазон разбивается на секции и используется для локального выполнения во всех рабочих процессах. Если задана функция-редуктор (reducer), то @distributed
выполняет локальные преобразования для каждого рабочего процесса и итоговое преобразование для вызывающего процесса.
Учтите, что без функции-редуктора макрос @distributed
выполняется асинхронно, т. е. порождает независимые задачи во всех рабочих процессах и немедленно возвращает результат, не ожидая завершения. Чтобы дождаться завершения, поставьте перед вызовом префикс @sync
, например:
@sync @distributed for var = range body end
#
Distributed.@everywhere
— Macro
@everywhere [procs()] expr
Выполняет выражение в Main
во всех процессах procs
. Возникающие в процессах ошибки собираются в исключение CompositeException
и затем выдаются. Например:
@everywhere bar = 1
Это выражение определяет Main.bar
для всех текущих процессов. Для добавляемых позже процессов (например, с помощью addprocs()
) это выражение определено не будет.
В отличие от @spawnat
, макрос @everywhere
не записывает никакие локальные переменные. Вместо этого их можно транслировать с использованием интерполяции:
foo = 1 @everywhere bar = $foo
Необязательный аргумент procs
позволяет задать подмножество всех процессов, которым необходимо выполнить выражение.
Это эквивалентно вызову remotecall_eval(Main, procs, expr)
с двумя дополнительными особенностями:
- `using` and `import` statements run on the calling process first, to ensure предварительную компиляцию пакетов. - Текущий путь к файлу исходного кода, используемый `include`, распространяется на другие процессы.
#
Distributed.remoteref_id
— Function
remoteref_id(r::AbstractRemoteRef) -> RRID
Объекты Future
и RemoteChannel
можно идентифицировать по следующим полям:
-
where
— узел, где фактически существует используемый объект или хранилище, указываемые по ссылке. -
whence
— узел, с которого была создана удаленная ссылка. Обратите внимание, что это не тот узел, на котором фактически существует указываемый по ссылке объект. Например, при вызовеRemoteChannel(2)
из главного процесса мы получим дляwhere
значение 2, а дляwhence
— 1. -
id
— уникальный идентификатор среди всех ссылок, созданных с узла рабочего процесса, указанного вwhence
.
В совокупности whence
и id
уникальным образом идентифицируют ссылку среди всех рабочих процессов.
Функция remoteref_id
представляет собой низкоуровневый API-интерфейс, который возвращает объект RRID
, являющийся оболочкой для значений whence
и id
удаленной ссылки.
#
Distributed.channel_from_id
— Function
channel_from_id(id) -> c
Низкоуровневый API-интерфейс, который возвращает вспомогательный канал AbstractChannel
для идентификатора id
, возвращаемого функцией remoteref_id
. Этот вызов действителен только на узле, где вспомогательный канал существует.
#
Distributed.worker_id_from_socket
— Function
worker_id_from_socket(s) -> pid
Низкоуровневый API-интерфейс, который принимает IO
-подключение или рабочий процесс Worker
и возвращает идентификатор pid
подключенного к ним рабочего процесса. Это бывает полезно при написании для типа особых методов serialize
, которые оптимизируют выводимые данные в зависимости от идентификатора принимающего процесса.
cluster_cookie() -> cookie
Возвращает cookie-файл кластера.
cluster_cookie(cookie) -> cookie
Задает принимаемый cookie-файл в качестве cookie кластера и возвращает этот файл.
Cluster Manager Interface
This interface provides a mechanism to launch and manage Julia workers on different cluster environments. There are two types of managers present in Base: LocalManager
, for launching additional workers on the same host, and SSHManager
, for launching on remote hosts via ssh
. TCP/IP sockets are used to connect and transport messages between processes. It is possible for Cluster Managers to provide a different transport.
#
Distributed.ClusterManager
— Type
ClusterManager
Супертип для диспетчеров кластеров, которые управляют рабочими процессами как кластером. Диспетчеры кластеров реализуют способы добавления, удаления и взаимодействия для рабочих процессов. Подтипами данного типа являются SSHManager
и LocalManager
.
#
Distributed.WorkerConfig
— Type
WorkerConfig
Тип, используемый диспетчерами кластеров (ClusterManager
) для управления добавляемыми в кластеры рабочими процессами. Некоторые поля используются всеми диспетчерами кластеров для доступа к узлу:
-
io
— подключение, используемое для доступа к рабочему процессу (подтипIO
илиNothing
). -
host
— адрес узла (String
либоNothing
). -
port
— используемый на узле порт для подключения к рабочему процессу (Int
либоNothing
).
Некоторые поля используются диспетчером кластеров для добавления рабочих процессов к уже инициализированному узлу:
-
count
— число рабочих процессов, которые нужно запустить на узле. -
exename
— путь к исполняемому файлу Julia на узле, по умолчанию"$(Sys.BINDIR)/julia"
, или"$(Sys.BINDIR)/julia-debug"
. -
exeflags
— флаги, используемые при удаленном запуске Julia.
Поле userdata
используется внешними диспетчерами для хранения данных о каждом рабочем процессе.
Некоторые поля используются SSHManager
и подобными диспетчерами:
-
tunnel
— может иметь значениеtrue
(использовать туннелирование),false
(не использовать туннелирование) илиnothing
(использовать режим по умолчанию диспетчера). -
multiplex
— может иметь значениеtrue
(использовать для туннелирования мультиплексирование SSH) илиfalse
. -
forward
— параметр переадресации, используемый для параметра-L
в SSH. -
bind_addr
— адрес для привязки на удаленном узле. -
sshflags
— флаги, используемые при установке SSH-соединения. -
max_parallel
— максимальное число рабочих процессов, к которым параллельно осуществляется подключение на узле.
Некоторые поля используются как локальными диспетчерами (LocalManager
), так и диспетчерами SSHManager
:
-
connect_at
— определяет, используется ли данный вызов для настройки подключения между рабочими процессами или между рабочим процессом и главным. -
process
— процесс, который будет подключен (как правило, диспетчер назначает его при выполненииaddprocs
). -
ospid
— идентификатор процесса согласно ОС узла, используемый для прерывания рабочих процессов. -
environ
— закрытый словарь, используемый локальными диспетчерами или диспетчерами SSH для хранения временных данных. -
ident
— рабочий процесс, определяемый диспетчером кластеров (ClusterManager
). -
connect_idents
— список идентификаторов процессов, к которым должен подключаться рабочий процесс при использовании настраиваемой топологии. -
enable_threaded_blas
— задает для рабочих процессов использование BLAS в многопоточном режиме (возможные значения:true
,false
илиnothing
).
#
Distributed.launch
— Function
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
Реализуется в диспетчерах кластеров. Для каждого запускаемого этой функцией рабочего процесса Julia она должна добавлять в массив launched
запись WorkerConfig
и отправлять уведомление launch_ntfy
. Функция ОБЯЗАТЕЛЬНО должна завершать работу, когда все запрошенные диспетчером manager
функции будут запущены. params
— это словарь всех именованных аргументов, с которыми вызывалась функция addprocs
.
#
Distributed.manage
— Function
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
Реализуется в диспетчерах кластеров. Эта функция вызывается в главном процессе во время существования рабочего процесса с необходимыми значениями op
:
-
Со значением
:register
или:deregister
при добавлении или удалении процесса в пуле рабочих процессов Julia. -
Со значением
:interrupt
при вызовеinterrupt(workers)
. ДиспетчерClusterManager
должен подать соответствующему рабочему процессу сигнал прерывания. -
Со значением
:finalize
для целей очистки.
#
Base.kill
— Method
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
Реализуется в диспетчерах кластеров. Этот метод вызывается rmprocs
в главном процессе. Он должен завершать работу удаленного процесса, задаваемого pid
. Метод kill(manager::ClusterManager.....)
удаленно выполняет exit()
в pid
.
#
Sockets.connect
— Method
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
Реализуется в диспетчерах кластеров, использующих особые транспортные протоколы. Метод должен устанавливать логическое подключение к рабочему процессу с идентификатором pid
, задаваемое в config
, и возвращать пару объектов IO
. Сообщения от pid
к текущему процессу будут считываться из потока instrm
, а сообщения, отправляемые процессу pid
, — записываться в outstrm
. Реализация особого транспортного протокола должна обеспечивать доставку и получение сообщений полностью и в правильном порядке. Метод connect(manager::ClusterManager.....)
настраивает подключение сокетов TCP/IP между рабочими процессами.
#
Distributed.init_worker
— Function
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
Вызывается диспетчерами кластеров, реализующими особые транспортные протоколы. Эта функция инициализирует только что запущенный процесс в качестве рабочего процесса. Аргумент командной строки --worker[=<cookie>]
обеспечивает инициализацию процесса как рабочего с использованием в качестве транспортного протокола сокетов TCP/IP. В качестве cookie
используется файл cluster_cookie
.
#
Distributed.start_worker
— Function
start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
start_worker
— это внутренняя функция, являющаяся точкой входа по умолчанию для рабочих процессов, подключающихся через TCP/IP. Она настраивает процесс в качестве рабочего процесса кластера Julia.
Данные host:port записываются в поток out
(по умолчанию stdout).
При необходимости функция считывает cookie-файл из stdin, ожидает передачи данных на незанятом порту (либо на порту, который задается параметром командной строки --bind-to
) и планирует выполнение задач по обработке входящих TCP-подключений и запросов. Эта функция также может (необязательно) закрывать stdin и перенаправлять поток stderr в stdout.
Она не возвращает никаких значений.
#
Distributed.process_messages
— Function
process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
Вызывается диспетчерами кластеров, использующими особые транспортные протоколы. Эта функция должна вызываться, когда особая реализация транспортного протокола получает первое сообщение от удаленного рабочего процесса. Настроенный протокол должен обеспечить логическое подключение к удаленному процессу и предоставить два объекта IO
: один для входящих сообщений, а другой для сообщений, адресуемых удаленному процессу. Если incoming
имеет значение true
, соединение инициировано удаленным одноранговым узлом. Тот узел в паре, который инициировал соединение, передает cookie-файл кластера и свой номер версии Julia для согласования подключения и проверки подлинности.
См. также описание cluster_cookie
.
#
Distributed.default_addprocs_params
— Function
default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
Реализуется в диспетчерах кластеров. Именованные параметры по умолчанию, передаваемые при вызове addprocs(mgr)
. Минимальный набор параметров доступен при вызове default_addprocs_params()
.