Документация Engee

Распределенные вычисления

Инструменты для реализации распределенных вычислений.

addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers

Запускает рабочие процессы через указанный диспетчер кластеров.

Например, кластеры Beowulf поддерживаются через специализированный диспетчер, реализованный в пакете ClusterManagers.jl.

Время в секундах, в течение которого запущенный рабочий процесс должен ожидать подключения от главного процесса, можно указать в переменной JULIA_WORKER_TIMEOUT в среде рабочего процесса. Это применимо, только когда в качестве транспортного протокола используется TCP/IP.

Чтобы рабочие процессы не блокировали REPL или содержащую их функцию при запуске процессов программно, следует выполнять addprocs в отдельной задаче.

Примеры

# Асинхронный запуск addprocs для занятых кластеров
t = @async addprocs(...)
# Использование рабочих процессов по мере их включения
if nprocs() > 1   # Проверка, что доступен хотя бы один новый процесс
   ....   # Распределенное выполнение
end
# Получение ИД запущенных рабочих процессов или сообщений об ошибках
if istaskdone(t)   # Проверка завершения addprocs, чтобы из-за fetch не возникала блокировка
    if nworkers() == N
        new_pids = fetch(t)
    else
        fetch(t)
    end
end

addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> List of process identifiers

Добавляет рабочие процессы на удаленных компьютерах через SSH. Настройка производится с помощью именованных аргументов (см. ниже). В частности, с помощью аргумента exename можно указать путь к двоичному файлу julia на удаленных компьютерах.

machines — это вектор, содержащий характеристики компьютеров в виде строк формата [user@]host[:port] [bind_addr[:port]]. По умолчанию user — это текущий пользователь, а port — стандартный SSH-порт. Если указан элемент [bind_addr[:port]], другие рабочие процессы будут подключаться к текущему по указанному адресу bind_addr через порт port.

На удаленном узле можно запустить несколько процессов, передав кортеж в векторе machines в формате (machine_spec, count), где count — это число рабочих процессов, которые нужно запустить на указанном узле. Если в качестве числа рабочих процессов передать :auto, число рабочих процессов будет равно числу потоков ЦП на удаленном узле.

Примеры:

addprocs([
    "remote1",               # один рабочий процесс на узле remote1, выполняющий вход с текущим именем пользователя
    "user@remote2",          # один рабочий процесс на узле remote2, выполняющий вход с именем пользователя user
    "user@remote3:2222",     # для узла remote3 указывается SSH-порт 2222
    ("user@remote4", 4),     # на узле remote4 запускаются 4 рабочих процесса
    ("user@remote5", :auto), # на узле remote5 запускается столько рабочих процессов, сколько имеется потоков ЦП
])

Именованные аргументы:

  • tunnel: при значении true будет использоваться SSH-туннелирование для подключения к рабочему процессу из главного процесса. Значение по умолчанию — false.

  • multiplex: при значении true для SSH-туннелирования используется мультиплексирование SSH. Значение по умолчанию — false.

  • ssh: имя исполняемого файла SSH-клиента, используемого для запуска рабочих процессов, или путь к этому файлу. Значение по умолчанию — "ssh".

  • sshflags: задает дополнительные параметры SSH, например $sshflags=-i /home/foo/bar.pem$.

  • max_parallel: задает максимальное число рабочих процессов, к которым параллельно осуществляется подключение на узле. По умолчанию — 10.

  • shell: указывает тип оболочки, к которой подключается SSH на рабочих узлах.

    • shell=:posix: оболочка Unix/Linux, совместимая с POSIX (sh, ksh, bash, dash, zsh и т. д.). Используется по умолчанию.

    • shell=:csh: оболочка C для Unix (csh, tcsh).

    • shell=:wincmd: cmd.exe системы Microsoft Windows.

  • dir: задает рабочий каталог рабочих процессов. По умолчанию — текущий каталог узла (определяемый pwd()).

  • enable_threaded_blas: если задано значение true, то BLAS будет работать в добавляемых процессах в многопоточном режиме. Значение по умолчанию — false.

  • exename: имя исполняемого файла julia. По умолчанию имеет значение "$(Sys.BINDIR)/julia" или "$(Sys.BINDIR)/julia-debug" в зависимости от ситуации. На всех удаленных компьютерах рекомендуется использовать одну и ту же версию Julia, иначе при сериализации и распространении кода могут происходить сбои.

  • exeflags: дополнительные флаги, передаваемые рабочим процессам.

  • topology: задает, как рабочие процессы подключаются друг к другу. Отправка сообщения между неподключенными рабочими процессами вызывает ошибку.

    • topology=:all_to_all: все процессы подключаются друг к другу. Используется по умолчанию.

    • topology=:master_worker: только главный процесс, то есть процесс с pid 1, подключается к рабочим процессам. Рабочие процессы не подключаются друг к другу.

    • topology=:custom: метод launch диспетчера кластера задает топологию подключения посредством полей ident и connect_idents в WorkerConfig. Рабочий процесс с идентификатором диспетчера кластера ident подключается к всем рабочим процессам, указанным в connect_idents.

  • lazy: применимо только для topology=:all_to_all. При значении true соединения между рабочими процессами настраиваются в отложенном режиме, то есть при первом удаленном вызове между рабочими процессами. Значение по умолчанию — true.

  • env: позволяет задать массив из пар строковых значений, таких как env=["JULIA_DEPOT_PATH"=>"/depot"], чтобы запросить задание переменных среды на удаленном компьютере. По умолчанию только переменная среды JULIA_WORKER_TIMEOUT автоматически передается из локальной в удаленную среду.

  • cmdline_cookie: передает cookie-файл проверки подлинности с использованием параметра --worker командной строки. Применяемая по умолчанию (и более безопасная) передача cookie-файла через поток stdio в SSH может зависать при использовании рабочих процессов Windows с устаревшими версиями Julia или Windows (до появления ConPTY). В этом случае cmdline_cookie=true может служить обходным решением.

Совместимость: Julia 1.6

Именованные аргументы ssh, shell, env и cmdline_cookie были добавлены в Julia 1.6.

Переменные среды:

Если главному процессу не удается установить соединение с только что запущенным рабочим процессом в течение 60,0 секунд, рабочий процесс считает это неустранимой ошибкой и завершает работу. Это время ожидания можно контролировать через переменную среды JULIA_WORKER_TIMEOUT. Значение JULIA_WORKER_TIMEOUT для главного процесса задает время в секундах, в течение которого только что запущенный рабочий процесс ожидает подключения.


addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers

Запускает np рабочих процессов на локальном узле, используя встроенный диспетчер LocalManager.

Локальные рабочие процессы наследуют текущую пакетную среду (то есть активный проект, LOAD_PATH и DEPOT_PATH) от главного процесса.

Обратите внимание, что рабочие процессы не выполняют скрипт запуска ~/.julia/config/startup.jl и не синхронизируют свое глобальное состояние (включая параметры командной строки, глобальные переменные, новые определения методов и загруженные модули) с другими запущенными процессами.

Именованные аргументы:

  • restrict::Bool: если имеет значение true (по умолчанию), привязка ограничена адресом 127.0.0.1.

  • dir, exename, exeflags, env, topology, lazy, enable_threaded_blas: действуют аналогичным образом для SSHManager; см. документацию по addprocs(machines::AbstractVector).

Совместимость: Julia 1.9

Наследование пакетной среды и именованный аргумент env были добавлены в версии Julia 1.9.

nprocs()

Получает число доступных процессов.

Примеры

julia> nprocs()
3

julia> workers()
2-element Array{Int64,1}:
 2
 3
nworkers()

Получает число доступных рабочих процессов. Оно на единицу меньше, чем nprocs(), но будет равно nprocs() при nprocs() == 1.

Примеры

$ julia -p 2

julia> nprocs()
3

julia> nworkers()
2
procs()

Возвращает список всех идентификаторов процессов, включая идентификатор 1 (который не включается в workers()).

Примеры

$ julia -p 2

julia> procs()
3-element Array{Int64,1}:
 1
 2
 3
procs(pid::Integer)

Возвращает список всех идентификаторов процессов на одном физическом узле, а именно всех рабочих процессов, привязанных к тому же IP-адресу, что у pid.

workers()

Возвращает список всех идентификаторов рабочих процессов.

Примеры

$ julia -p 2

julia> workers()
2-element Array{Int64,1}:
 2
 3
rmprocs(pids...; waitfor=typemax(Int))

Удаляет указанные рабочие процессы. Учтите, что только процесс 1 может добавлять или удалять рабочими процессами.

Аргумент waitfor задает время ожидания отключения рабочих процессов:

  • Если он не указан, rmprocs будет ждать до тех пор, пока все запрошенные процессы pids не будут удалены.

  • Если завершить работу всех процессов за указанное время waitfor секунд невозможно, будет выдано исключение ErrorException.

  • Если waitfor имеет значение 0, вызов немедленно возвращает рабочие процессы, запланированные к удалению в другой задаче. Возвращается объект Task, в котором запланировано удаление. Пользователю следует вызвать wait для этой задачи, прежде чем начинать какие-либо другие параллельные вызовы.

Примеры

$ julia -p 5

julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0

julia> wait(t)

julia> workers()
3-element Array{Int64,1}:
 4
 5
 6
interrupt(pids::Integer...)

Прерывает выполняемую в данный момент задачу в указанных рабочих процессах. Это равноценно нажатию на локальном компьютере клавиш Ctrl-C. При отсутствии аргументов прерываются все рабочие процессы.


interrupt(pids::AbstractVector=workers())

Прерывает выполняемую в данный момент задачу в указанных рабочих процессах. Это равноценно нажатию на локальном компьютере клавиш Ctrl-C. При отсутствии аргументов прерываются все рабочие процессы.

myid()

Получает идентификатор текущего процесса.

Примеры

julia> myid()
1

julia> remotecall_fetch(() -> myid(), 4)
4
pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection

Преобразует коллекцию c, применяя f к каждому элементу с использованием доступных рабочих процессов и задач.

Если указано несколько аргументов коллекции, f применяется поэлементно.

Учтите, что необходимо обеспечить доступность f для всех рабочих процессов. Дополнительные сведения см. в разделе Доступность кода и загрузка пакетов.

Если не указать пул рабочих процессов, будут использованы все доступные процессы, т. е. пул CachingPool.

По умолчанию pmap распределяет вычисления по всем указанным рабочим процессам. Чтобы использовать только локальные процессы и распределить нагрузку по задачам, укажите distributed=false. Это равноценно использованию asyncmap. Например, вызов pmap(f, c; distributed=false) эквивалентен asyncmap(f,c; ntasks=()->nworkers()).

Функция pmap также может использовать сочетание процессов и задач с помощью аргумента размера пакетов (batch_size). При размере пакетов больше 1 коллекция обрабатывается в нескольких пакетах, каждый из которых имеет длину batch_size или меньше. Пакет отправляется в виде одного запроса свободному рабочему процессу, и затем локальная функция asyncmap обрабатывает элементы пакета с помощью множества параллельных задач.

При возникновении какой-либо ошибки функция pmap останавливает обработку оставшейся части коллекции. Чтобы этого не происходило, вы можете использовать аргумент on_error и задать функцию обработки ошибок, которая принимает единственный аргумент, а именно исключение. Эта функция может повторно выдавать ошибку и прерывать обработку либо продолжать работу, возвращая любое значение в inline-режиме вместе с результатами вызывающему объекту.

Рассмотрим следующие два примера кода. В первом примере объект исключения возвращается в inline-режиме, а во втором вместо любого исключения возвращается 0:

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0

Для обработки ошибок можно также повторять невыполненные вычисления. Именованные аргументы retry_delays и retry_check передаются в retry в качестве именованных аргументов delays и check соответственно. Если задана пакетная обработка и происходит сбой всего пакета, все элементы из пакета будут обрабатываться повторно.

Учтите, что если указать и on_error, и retry_delays, то сначала вызывается перехватчик on_error, а затем выполняется повтор. Если on_error не выдает (или не выдает повторно) исключение, обработка элемента не повторяется.

Пример: повторное выполнение f для элемента максимум три раза без какой-либо паузы при наличии ошибок.

pmap(f, c; retry_delays = zeros(3))

Пример: Повторное выполнение f только при исключениях, не относящихся к типу InexactError, с экспоненциальным увеличением задержки максимум три раза, а для всех случаев InexactError — возврат на месте значения NaN.

pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
RemoteException(captured)

Фиксирует исключения в удаленных вычислениях и повторно выдает их локально. RemoteException является оболочкой для идентификатора рабочего процесса (pid) и записанного исключения. В CapturedException записывается удаленное исключение и стек вызовов в сериализованной форме на тот момент, когда исключение возникло.

ProcessExitedException(worker_id::Int)

После завершения клиентского процесса Julia дальнейшие попытки обращения к недостижимому дочернему объекту будут приводить к выдаче исключения.

Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)

Тип Future служит для хранения одного вычисления с неизвестным состоянием и временем завершения. Для множества возможных вычислений см. RemoteChannel. Для идентификации AbstractRemoteRef см. remoteref_id.

RemoteChannel(pid::Integer=myid())

Создает ссылку на Channel{Any}(1) для процесса pid. По умолчанию pid — это текущий процесс.

RemoteChannel(f::Function, pid::Integer=myid())

Создает ссылки на удаленные каналы определенного размера и типа. f — это функция, которая при выполнении для процесса pid должна возвращать реализацию AbstractChannel.

Например, RemoteChannel(()->Channel{Int}(10), pid) возвращает ссылку на канал типа Int с размером 10 для процесса pid.

По умолчанию pid — это текущий процесс.

fetch(x::Future)

Ожидает и получает значение типа Future. Полученное значение помещается в локальный кэш. Последующие вызовы fetch по той же ссылке будут возвращать кэшированное значение. Если удаленное значение соответствует исключению, то выдается исключение RemoteException, в котором записаны удаленное исключение и обратная трассировка.

fetch(c::RemoteChannel)

Ожидает и получает значение из RemoteChannel. Выдает такие же исключения, как и для Future. Полученный элемент не удаляется.


fetch(x::Any)

Возвращает x.

remotecall(f, id::Integer, args...; kwargs...) -> Future

Вызывает функцию f в асинхронном режиме для заданных аргументов в указанном процессе. Возвращает Future. Именованные аргументы (при наличии) передаются функции f.

remotecall_wait(f, id::Integer, args...; kwargs...)

Выполняет более быстрое ожидание wait(remotecall(...)) в одном сообщении рабочего процесса (Worker), задаваемого идентификатором id. Именованные аргументы (при наличии) передаются функции f.

См. также описание wait и remotecall.

remotecall_fetch(f, id::Integer, args...; kwargs...)

Выполняет fetch(remotecall(...)) в одном сообщении. Именованные аргументы (при наличии) передаются функции f. Любые удаленные исключения фиксируются в RemoteException и затем выдаются.

См. также описание fetch и remotecall.

Примеры

$ julia -p 2

julia> remotecall_fetch(sqrt, 2, 4)
2.0

julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt was called with a negative real argument but will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...
remote_do(f, id::Integer, args...; kwargs...) -> nothing

Выполняет функцию f в рабочем процессе id в асинхронном режиме. В отличие от remotecall, этот метод не сохраняет результат вычисления, а также не позволяет ожидать завершения.

Успешный вызов показывает, что запрос был принят к выполнению на удаленном узле.

Последовательные вызовы remotecall для одного рабочего процесса сериализируются в порядке их осуществления, однако порядок выполнения на удаленном узле не определен. Например, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) сериализирует вызовы f1, а затем f2 и f3 в таком порядке. Однако выполнение f1 перед f3 в рабочем процессе 2 не гарантируется.

Выдаваемые f исключения выводятся в поток stderr удаленного процесса.

Именованные аргументы (при наличии) передаются функции f.

put!(rr::RemoteChannel, args...)

Сохраняет набор значений в канале RemoteChannel. Если канал заполнен, метод блокирует работу, пока не появится доступное пространство. Возвращается первый аргумент.

put!(rr::Future, v)

Сохраняет значение в объект rr типа Future. Объекты Future представляют собой однократно записываемые удаленные ссылки. При использовании put! для уже заданного Future выдается исключение (Exception). Все асинхронные удаленные вызовы возвращают объекты Future и присваивают им возвращаемые значения вызова по его завершении.

take!(rr::RemoteChannel, args...)

Получает значения из удаленного канала RemoteChannel rr, удаляя эти значения в процессе.

isready(rr::RemoteChannel, args...)

Определяет, имеет ли RemoteChannel значение, хранимое в нем. Учтите, что эта функция может вызывать состояние гонки, так как к моменту получения результата он может уже не соответствовать истине. Однако ее можно безопасно использовать с объектами Future, так как значения им присваиваются только один раз.

isready(rr::Future)

Определяет, имеет ли Future значение, хранимое в нем.

Если аргумент Future принадлежит другому узлу, вызов блокирует работу, ожидая ответа. Вместо этого рекомендуется ожидать rr в отдельной задаче или использовать промежуточный локальный канал (Channel):

p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f)  # блокировки не будет.
AbstractWorkerPool

Супертип для пулов рабочих процессов, таких как WorkerPool и CachingPool. Для пула AbstractWorkerPool необходимо реализовать следующее:

  • push! - add a new worker to the overall pool (available + busy)

  • put! - put back a worker to the available pool

  • take! - take a worker from the available pool (to be used for remote function execution)

  • length - number of workers available in the overall pool

  • isready - return false if a take! on the pool would block, else true

Используемые по умолчанию реализации вышеуказанных методов (для AbstractWorkerPool) требуют указания следующих полей:

  • channel::Channel{Int}

  • workers::Set{Int}, где поле channel содержит идентификаторы свободных рабочих процессов, а workers — набор всех процессов, связанных с данным пулом.

WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})

Создает пул WorkerPool на основе вектора или диапазона идентификаторов рабочих процессов.

Примеры

$ julia -p 3

julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))

julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
CachingPool(workers::Vector{Int})

Представляет собой реализацию AbstractWorkerPool. remote, remotecall_fetch, pmap (и другие удаленные вызовы для удаленного выполнения функций) работают эффективнее при кэшировании сериализируемых и десериализируемых функций в рабочих узлах, особенно при использовании замыканий (способных записывать большой объем данных).

Удаленный кэш поддерживается в течение всего времени существования возвращаемого объекта CachingPool. Для более ранней очистки кэша используйте clear!(pool).

При использовании глобальных переменных в замыкании записываются только привязки, а не сами данные. Для записи глобальных данных можно использовать блоки let.

Примеры

const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
    pmap(i -> sum(foo) + i, wp, 1:100);
end

В примере выше foo передается каждому рабочему процессу только один раз.

default_worker_pool()

Пул AbstractWorkerPool, содержащий неактивные рабочие процессы workers и используемый для remote(f) и pmap (по умолчанию). Если пул рабочих процессов по умолчанию не задан явно с помощью default_worker_pool!(pool), в качестве пула по умолчанию будет инициализирован WorkerPool.

Примеры

$ julia -p 3

julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
clear!(syms, pids=workers(); mod=Main)

Очищает в модулях глобальные привязки, инициализируя их как nothing. Параметр syms должен быть значением типа Symbol или коллекцией значений Symbol. pids и mod указывают процессы и модуль, в которых глобальные переменные будут инициализированы повторно. Будут очищены только имена, определения которых обнаружены в mod.

При запросе об очистке глобальной константы будет выдано исключение.


clear!(pool::CachingPool) -> pool

Удаляет все кэшированные функции для всех участвующих рабочих процессов.

remote([p::AbstractWorkerPool], f) -> Function

Возвращает анонимную функцию, которая выполняет функцию f в доступном рабочем процессе (взятом из пула p типа WorkerPool, если такой пул предоставлен) с использованием remotecall_fetch.

remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

Вариант remotecall(f, pid, ....) с использованием пула WorkerPool. Ожидает и забирает из pool свободный рабочий процесс, а затем выполняет в нем remotecall.

Примеры

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)

В этом примере задача выполнялась в процессе pid 2, вызванном из pid 1.

remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

Вариант remotecall_wait(f, pid, ....) с использованием пула WorkerPool. Ожидает и забирает из pool свободный рабочий процесс, а затем выполняет в нем remotecall_wait.

Примеры

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)

julia> fetch(f)
0.9995177101692958
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result

Вариант remotecall_fetch(f, pid, ....) с использованием пула WorkerPool. Ожидает и забирает из pool свободный рабочий процесс, а затем выполняет в нем remotecall_fetch.

Примеры

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing

Вариант remote_do(f, pid, ....) с использованием пула WorkerPool. Ожидает и забирает из pool свободный рабочий процесс, а затем выполняет в нем remote_do.

@spawn expr

Создает вокруг выражения замыкание и выполняет это замыкание в автоматически выбранном процессе, возвращая Future в качестве результата. Этот макрос устарел; вместо него следует использовать @spawnat :any expr.

Примеры

julia> addprocs(3);

julia> f = @spawn myid()
Future(2, 1, 5, nothing)

julia> fetch(f)
2

julia> f = @spawn myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Совместимость: Julia 1.3

Начиная с версии Julia 1.3 этот макрос устарел. Используйте вместо него @spawnat :any.

@spawnat p expr

Создает вокруг выражения замыкание и выполняет это замыкание асинхронно в процессе p. Возвращает Future в качестве результата. Если указать для p в кавычках символьный литерал :any, система выберет используемый обработчик автоматически.

Примеры

julia> addprocs(3);

julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)

julia> fetch(f)
2

julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Совместимость: Julia 1.3

Аргумент :any впервые реализован в Julia 1.3.

@fetch expr

Эквивалентно fetch(@spawnat :any expr). См. описание fetch и @spawnat.

Примеры

julia> addprocs(3);

julia> @fetch myid()
2

julia> @fetch myid()
3

julia> @fetch myid()
4

julia> @fetch myid()
2
@fetchfrom

Эквивалентно fetch(@spawnat p expr). См. описание fetch и @spawnat.

Примеры

julia> addprocs(3);

julia> @fetchfrom 2 myid()
2

julia> @fetchfrom 4 myid()
4
@distributed

Распределяет в параллельном режиме память для цикла в следующей форме:

@distributed [reducer] for var = range body end

Указанный диапазон разбивается на секции и используется для локального выполнения во всех рабочих процессах. Если задана функция-редуктор (reducer), то @distributed выполняет локальные преобразования для каждого рабочего процесса и итоговое преобразование для вызывающего процесса.

Учтите, что без функции-редуктора макрос @distributed выполняется асинхронно, т. е. порождает независимые задачи во всех рабочих процессах и немедленно возвращает результат, не ожидая завершения. Чтобы дождаться завершения, поставьте перед вызовом префикс @sync, например:

@sync @distributed for var = range body end
@everywhere [procs()] expr

Выполняет выражение в Main во всех процессах procs. Возникающие в процессах ошибки собираются в исключение CompositeException и затем выдаются. Например:

@everywhere bar = 1

Это выражение определяет Main.bar для всех текущих процессов. Для добавляемых позже процессов (например, с помощью addprocs()) это выражение определено не будет.

В отличие от @spawnat, макрос @everywhere не записывает никакие локальные переменные. Вместо этого их можно транслировать с использованием интерполяции:

foo = 1 @everywhere bar = $foo

Необязательный аргумент procs позволяет задать подмножество всех процессов, которым необходимо выполнить выражение.

Это эквивалентно вызову remotecall_eval(Main, procs, expr) с двумя дополнительными особенностями:

- `using` and `import` statements run on the calling process first, to ensure

  предварительную компиляцию пакетов. - Текущий путь к файлу исходного кода, используемый `include`, распространяется на другие процессы.
remoteref_id(r::AbstractRemoteRef) -> RRID

Объекты Future и RemoteChannel можно идентифицировать по следующим полям:

  • where — узел, где фактически существует используемый объект или хранилище, указываемые по ссылке.

  • whence — узел, с которого была создана удаленная ссылка. Обратите внимание, что это не тот узел, на котором фактически существует указываемый по ссылке объект. Например, при вызове RemoteChannel(2) из главного процесса мы получим для where значение 2, а для whence — 1.

  • id — уникальный идентификатор среди всех ссылок, созданных с узла рабочего процесса, указанного в whence.

В совокупности whence и id уникальным образом идентифицируют ссылку среди всех рабочих процессов.

Функция remoteref_id представляет собой низкоуровневый API-интерфейс, который возвращает объект RRID, являющийся оболочкой для значений whence и id удаленной ссылки.

channel_from_id(id) -> c

Низкоуровневый API-интерфейс, который возвращает вспомогательный канал AbstractChannel для идентификатора id, возвращаемого функцией remoteref_id. Этот вызов действителен только на узле, где вспомогательный канал существует.

worker_id_from_socket(s) -> pid

Низкоуровневый API-интерфейс, который принимает IO-подключение или рабочий процесс Worker и возвращает идентификатор pid подключенного к ним рабочего процесса. Это бывает полезно при написании для типа особых методов serialize, которые оптимизируют выводимые данные в зависимости от идентификатора принимающего процесса.

cluster_cookie() -> cookie

Возвращает cookie-файл кластера.

cluster_cookie(cookie) -> cookie

Задает принимаемый cookie-файл в качестве cookie кластера и возвращает этот файл.

Cluster Manager Interface

This interface provides a mechanism to launch and manage Julia workers on different cluster environments. There are two types of managers present in Base: LocalManager, for launching additional workers on the same host, and SSHManager, for launching on remote hosts via ssh. TCP/IP sockets are used to connect and transport messages between processes. It is possible for Cluster Managers to provide a different transport.

ClusterManager

Супертип для диспетчеров кластеров, которые управляют рабочими процессами как кластером. Диспетчеры кластеров реализуют способы добавления, удаления и взаимодействия для рабочих процессов. Подтипами данного типа являются SSHManager и LocalManager.

WorkerConfig

Тип, используемый диспетчерами кластеров (ClusterManager) для управления добавляемыми в кластеры рабочими процессами. Некоторые поля используются всеми диспетчерами кластеров для доступа к узлу:

  • io — подключение, используемое для доступа к рабочему процессу (подтип IO или Nothing).

  • host — адрес узла (String либо Nothing).

  • port — используемый на узле порт для подключения к рабочему процессу (Int либо Nothing).

Некоторые поля используются диспетчером кластеров для добавления рабочих процессов к уже инициализированному узлу:

  • count — число рабочих процессов, которые нужно запустить на узле.

  • exename — путь к исполняемому файлу Julia на узле, по умолчанию "$(Sys.BINDIR)/julia", или "$(Sys.BINDIR)/julia-debug".

  • exeflags — флаги, используемые при удаленном запуске Julia.

Поле userdata используется внешними диспетчерами для хранения данных о каждом рабочем процессе.

Некоторые поля используются SSHManager и подобными диспетчерами:

  • tunnel — может иметь значение true (использовать туннелирование), false (не использовать туннелирование) или nothing (использовать режим по умолчанию диспетчера).

  • multiplex — может иметь значение true (использовать для туннелирования мультиплексирование SSH) или false.

  • forward — параметр переадресации, используемый для параметра -L в SSH.

  • bind_addr — адрес для привязки на удаленном узле.

  • sshflags — флаги, используемые при установке SSH-соединения.

  • max_parallel — максимальное число рабочих процессов, к которым параллельно осуществляется подключение на узле.

Некоторые поля используются как локальными диспетчерами (LocalManager), так и диспетчерами SSHManager:

  • connect_at — определяет, используется ли данный вызов для настройки подключения между рабочими процессами или между рабочим процессом и главным.

  • process — процесс, который будет подключен (как правило, диспетчер назначает его при выполнении addprocs).

  • ospid — идентификатор процесса согласно ОС узла, используемый для прерывания рабочих процессов.

  • environ — закрытый словарь, используемый локальными диспетчерами или диспетчерами SSH для хранения временных данных.

  • ident — рабочий процесс, определяемый диспетчером кластеров (ClusterManager).

  • connect_idents — список идентификаторов процессов, к которым должен подключаться рабочий процесс при использовании настраиваемой топологии.

  • enable_threaded_blas — задает для рабочих процессов использование BLAS в многопоточном режиме (возможные значения: true, false или nothing).

launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)

Реализуется в диспетчерах кластеров. Для каждого запускаемого этой функцией рабочего процесса Julia она должна добавлять в массив launched запись WorkerConfig и отправлять уведомление launch_ntfy. Функция ОБЯЗАТЕЛЬНО должна завершать работу, когда все запрошенные диспетчером manager функции будут запущены. params — это словарь всех именованных аргументов, с которыми вызывалась функция addprocs.

manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

Реализуется в диспетчерах кластеров. Эта функция вызывается в главном процессе во время существования рабочего процесса с необходимыми значениями op:

  • Со значением :register или :deregister при добавлении или удалении процесса в пуле рабочих процессов Julia.

  • Со значением :interrupt при вызове interrupt(workers). Диспетчер ClusterManager должен подать соответствующему рабочему процессу сигнал прерывания.

  • Со значением :finalize для целей очистки.

kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

Реализуется в диспетчерах кластеров. Этот метод вызывается rmprocs в главном процессе. Он должен завершать работу удаленного процесса, задаваемого pid. Метод kill(manager::ClusterManager.....) удаленно выполняет exit() в pid.

connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

Реализуется в диспетчерах кластеров, использующих особые транспортные протоколы. Метод должен устанавливать логическое подключение к рабочему процессу с идентификатором pid, задаваемое в config, и возвращать пару объектов IO. Сообщения от pid к текущему процессу будут считываться из потока instrm, а сообщения, отправляемые процессу pid, — записываться в outstrm. Реализация особого транспортного протокола должна обеспечивать доставку и получение сообщений полностью и в правильном порядке. Метод connect(manager::ClusterManager.....) настраивает подключение сокетов TCP/IP между рабочими процессами.

init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())

Вызывается диспетчерами кластеров, реализующими особые транспортные протоколы. Эта функция инициализирует только что запущенный процесс в качестве рабочего процесса. Аргумент командной строки --worker[=<cookie>] обеспечивает инициализацию процесса как рабочего с использованием в качестве транспортного протокола сокетов TCP/IP. В качестве cookie используется файл cluster_cookie.

start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)

start_worker — это внутренняя функция, являющаяся точкой входа по умолчанию для рабочих процессов, подключающихся через TCP/IP. Она настраивает процесс в качестве рабочего процесса кластера Julia.

Данные host:port записываются в поток out (по умолчанию stdout).

При необходимости функция считывает cookie-файл из stdin, ожидает передачи данных на незанятом порту (либо на порту, который задается параметром командной строки --bind-to) и планирует выполнение задач по обработке входящих TCP-подключений и запросов. Эта функция также может (необязательно) закрывать stdin и перенаправлять поток stderr в stdout.

Она не возвращает никаких значений.

process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)

Вызывается диспетчерами кластеров, использующими особые транспортные протоколы. Эта функция должна вызываться, когда особая реализация транспортного протокола получает первое сообщение от удаленного рабочего процесса. Настроенный протокол должен обеспечить логическое подключение к удаленному процессу и предоставить два объекта IO: один для входящих сообщений, а другой для сообщений, адресуемых удаленному процессу. Если incoming имеет значение true, соединение инициировано удаленным одноранговым узлом. Тот узел в паре, который инициировал соединение, передает cookie-файл кластера и свой номер версии Julia для согласования подключения и проверки подлинности.

См. также описание cluster_cookie.

default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}

Реализуется в диспетчерах кластеров. Именованные параметры по умолчанию, передаваемые при вызове addprocs(mgr). Минимальный набор параметров доступен при вызове default_addprocs_params().