Параллелизм
Многопоточность и распределенные вычисления
В Julia есть два основных типа параллелизма:
-
Многопоточность
-
Распределенные вычисления
В многопоточном режиме несколько задач выполняются в одном процессе Julia и совместно используют одно и то же пространство памяти. При распределенных вычислениях задачи выполняются в нескольких процессах Julia с независимыми пространствами памяти. Это могут быть процессы на нескольких физических компьютерах, например в высокопроизводительном вычислительном кластере.
Для правильного выбора типа параллелизма важно понимать его особенности, так как код для каждого типа будет отличаться и у каждого подхода есть свои ограничения и преимущества. Однако выбор во многом зависит от решаемой задачи, поэтому может потребоваться поэкспериментировать с обоими подходами, чтобы определить наиболее подходящий.
Многопоточность
Чтобы использовать многопоточность в Julia, необходимо либо запустить Julia с флагом командной строки --threads=N
, либо установить переменную среды JULIA_NUM_THREADS
перед запуском Julia. В этом руководстве мы зададим переменную среды:
julia> ENV["JULIA_NUM_THREADS"]
"4"
Проверить, сколько потоков доступно, можно так:
julia> Threads.nthreads()
4
Самый простой способ использовать многопоточность в Julia — поместить макрос Threads.@threads
перед циклом for
:
julia> @time begin
ids = Int[]
my_lock = Threads.ReentrantLock()
Threads.@threads for i in 1:Threads.nthreads()
global ids, my_lock
Threads.lock(my_lock) do
push!(ids, Threads.threadid())
end
sleep(1.0)
end
end
1.037087 seconds (31.32 k allocations: 1.836 MiB, 2.02% compilation time)
Этот цикл for будет останавливаться на 1
секунду при каждой итерации. Таким образом, если бы он выполнялся последовательно, это заняло бы столько же секунд, сколько доступно потоков. Однако потребовалась всего 1 секунда, а значит, итерации выполнялись одновременно. В этом можно убедиться, проверив Threads.threadid()
потока, в котором выполнялась каждая итерация:
julia> ids
4-element Vector{Int64}:
2
4
1
3
При работе с потоками следует избегать состояний гонки, когда два потока одновременно пытаются выполнить запись в одну и ту же переменную. В приведенном выше примере мы избежали состояния гонки с помощью |
Распределенные вычисления
Для использования распределенных вычислений в Julia служит пакет Distributed
:
julia> import Distributed
Как и в случае с многопоточностью, нужно сообщить Julia, сколько процессов следует добавить. Это можно сделать либо запустив Julia с аргументом командной строки -p N
, либо используя Distributed.addprocs
:
julia> import Pkg
julia> project = Pkg.project();
julia> workers = Distributed.addprocs(4; exeflags = "--project=$(project.path)")
4-element Vector{Int64}:
2
3
4
5
Распространенной ошибкой является то, что родительская среда не загружается с |
Добавляемые процессы — это «рабочие» процессы, которые можно использовать для вычислений. Их выполнение координируется процессом с идентификатором 1
. Узнать, в каком процессе в настоящее время выполняется код, можно с помощью Distributed.myid()
.
julia> Distributed.myid()
1
Как правило, для достижения максимальной производительности следует добавлять столько процессов, сколько имеется логических ядер.
В отличие от многопоточности на основе цикла for
, распределенные вычисления расширяют функцию Julia map
до функции «параллельного сопоставления» Distributed.pmap
. Julia копирует каждый элемент в списке сопоставляемых аргументов в незанятый рабочий процесс и выполняет функцию, передавая элемент в качестве входного аргумента.
julia> function hard_work(i::Int)
sleep(1.0)
return Distributed.myid()
end
hard_work (generic function with 1 method)
julia> Distributed.pmap(hard_work, 1:4)
ERROR: On worker 2:
UndefVarError: #hard_work not defined
Stacktrace:
[...]
К сожалению, если вы попробуете выполнить этот код напрямую, то получите сообщение об ошибке On worker 2: UndefVarError: hard_work not defined
. Она возникает по той причине, что рабочим процессам, в отличие от процесса 1
, функция hard_work
неизвестна.
Чтобы исправить ошибку, нужно использовать макрос Distributed.@everywhere
, который вычисляет код в каждом процессе:
julia> Distributed.@everywhere begin
function hard_work(i::Int)
sleep(1.0)
return Distributed.myid()
end
end
Если теперь запустить функцию pmap
, она будет выполнена в каждом из рабочих процессов и это займет всего 1 секунду вместо 4:
julia> @time ids = Distributed.pmap(hard_work, 1:4)
1.202006 seconds (216.39 k allocations: 13.301 MiB, 4.07% compilation time)
4-element Vector{Int64}:
2
3
5
4
Дополнительные сведения см. в документации Julia в разделе Распределенные вычисления. |
Неправильное использование параллелизма
При сборке модели JuMP
В случае с большими задачами сборка модели в JuMP может оказаться узким местом, и, возможно, вы захотите попробовать написать код, который выполняет сборку модели в параллельном режиме, например, заключив цикл for
для добавления ограничений в Threads.@threads
.
К сожалению, сборка модели JuMP не может происходить параллельно, и попытка сделать это может привести к ошибкам или неверным результатам.
В большинстве случаев оказывается, что причиной узкого места является не JuMP, а способ построения задачи. Внеся некоторые изменения, можно добиться того, чтобы сборка модели не ограничивала производительность в процессе решения.
Нужна помощь в написании более быстрого кода? Попросите о помощи на форуме сообщества. Обязательно включите воспроизводимый пример кода. |
С одной моделью JuMP
Часто люди пытаются использовать параллелизм с одной моделью JuMP. Например, для оптимизации модели по нескольким векторам в правой части кто-то может попробовать сделать следующее.
using JuMP
import HiGHS
model = Model(HiGHS.Optimizer)
set_silent(model)
@variable(model, x)
@objective(model, Min, x)
solutions = Pair{Int,Float64}[]
my_lock = Threads.ReentrantLock()
Threads.@threads for i in 1:10
set_lower_bound(x, i)
optimize!(model)
@assert is_solved_and_feasible(model)
Threads.lock(my_lock) do
push!(solutions, i => objective_value(model))
end
end
Это не сработает и может привести к ошибке, сбою Julia или получению неверных результатов.
Правильное использование параллелизма
При использовании параллелизма с JuMP важно помнить простое правило: в каждом рабочем процессе должен быть собственный экземпляр модели JuMP.
Многопоточность
При использовании многопоточности создавайте новый экземпляр model
в каждой итерации цикла for:
julia> using JuMP
julia> import HiGHS
julia> solutions = Pair{Int,Float64}[]
julia> my_lock = Threads.ReentrantLock();
julia> Threads.@threads for i in 1:10
model = Model(HiGHS.Optimizer)
set_silent(model)
set_attribute(model, MOI.NumberOfThreads(), 1)
@variable(model, x)
@objective(model, Min, x)
set_lower_bound(x, i)
optimize!(model)
@assert is_solved_and_feasible(sudoku)
Threads.lock(my_lock) do
push!(solutions, i => objective_value(model))
end
end
julia> solutions
10-element Vector{Pair{Int64, Float64}}:
7 => 7.0
4 => 4.0
1 => 1.0
9 => 9.0
5 => 5.0
8 => 8.0
10 => 10.0
2 => 2.0
6 => 6.0
3 => 3.0
Для некоторых решателей может потребоваться ограничить количество используемых потоков одним, установив атрибут |
Распределенные вычисления
При использовании распределенных вычислений не забудьте оценить весь код во всех процессах с помощью Distributed.@everywhere
, а затем написать функцию, которая создает новый экземпляр модели при каждом вычислении:
julia> Distributed.@everywhere begin
using JuMP
import HiGHS
end
julia> Distributed.@everywhere begin
function solve_model_with_right_hand_side(i)
model = Model(HiGHS.Optimizer)
set_silent(model)
@variable(model, x)
@objective(model, Min, x)
set_lower_bound(x, i)
optimize!(model)
@assert is_solved_and_feasible(sudoku)
return objective_value(model)
end
end
julia> solutions = Distributed.pmap(solve_model_with_right_hand_side, 1:10)
10-element Vector{Float64}:
1.0
2.0
3.0
4.0
5.0
6.0
7.0
8.0
9.0
10.0
Другие виды параллелизма
GPU
JuMP не поддерживает программирование на GPU, и лишь немногие решатели поддерживают выполнение на GPU.
Параллелизм внутри решателя
Во многих решателях используется внутренний параллелизм. Например, такие коммерческие решатели, как Gurobi](https://github.com/jump-dev/Gurobi.jl) и CPLEX, распараллеливают поиск в методе ветвей и границ. Решатели, поддерживающие внутренний параллелизм, обычно поддерживают атрибут [MOI.NumberOfThreads
, который можно задать с помощью set_attribute
.