Многопоточность
#
Base.Threads.@threads
— Macro
Threads.@threads [schedule] for ... end
Макрос, исполняющий цикл for
в параллельном режиме. Пространство итерации распределяется среди крупномодульных задач. Эту политику можно задать аргументом schedule
. Исполнение цикла ожидает оценки всех итераций.
См. также описание @spawn
и pmap
в Distributed
.
Расширенная справка
Семантика
За исключением случаев, когда более твердые гарантии определены параметром настройки расписания, цикл, исполняемый макросом @threads
, имеет следующую семантику.
Макрос @threads
исполняет тело цикла в неустановленном порядке и, возможно, в многопоточном режиме. Он не задает точные присваивания заданий и потоки рабочей роли. Для каждого исполнения присваивания могут отличаться. Код тела цикла (включая весь код, транзитивно вызываемый из него) не должен делать допущения относительно распределения итераций по заданиям или потока рабочей роли, в котором они исполняются. Тело цикла для каждой итерации должно иметь возможность развиваться прогрессивно независимо от остальных итераций, и гонка по данным должна быть исключена. Таким образом, недопустимые синхронизации в различных итерациях могут взаимоблокироваться, а несинхронизированные сеансы доступа к памяти могут привести к неопределенному поведению.
Например, приведенные выше условия предполагают следующее.
-
Блокировка, установленная в итерации, должна быть снята в пределах той же итерации.
-
Взаимодействие между итерациями с использованием примитивов, таких как
Channel
, является некорректным. -
Запись выполняется только в расположения, которые не используются совместно различными итерациями (за исключением случаев, когда используется блокировка или атомарная операция).
-
Если не используется расписание
:static
, значениеthreadid()
может изменяться даже в пределах одной итерации. См. описаниеTask Migration
.
Планировщики
Без аргумента планировщика точное создание расписания настроить нельзя, и оно может варьироваться в различных выпусках Julia. В настоящее время используется :dynamic
, если планировщик не задан.
Совместимость: Julia 1.5
Аргумент |
:dynamic
(по умолчанию)
Планировщик :dynamic
исполняет итерации динамически для доступных потоков рабочей роли. Текущая реализация предполагает, что рабочая нагрузка для каждой итерации является однородной. Однако такое допущение можно устранить в будущем.
Этот параметр планирования представляет собой просто подсказку для базового механизма исполнения. Однако при этом можно ожидать ряда свойства. Количество Task
, используемых планировщиком :dynamic
, привязано к небольшой постоянной величине, равной количеству доступных потоков рабочей роли (Threads.threadpoolsize()
). Каждое задание обрабатывает сплошные регионы пространства итерации. Таким образом, @threads :dynamic for x in xs; f(x); end
, как правило, более эффективен, чем @sync for x in xs; @spawn f(x); end
, если length(xs)
значительно превышает количество потоков рабочей роли и время выполнения f(x)
относительно меньше стоимости порождения и синхронизации задания (как правило, менее 10 миллисекунд).
Совместимость: Julia 1.8
Доступен параметр |
:static
Планировщик :static
создает одно задание на поток и разделяет итерации поровну между ними, присваивая каждое конкретное задание отдельно каждому потоку. В частности, значение threadid()
гарантированно будет константой в пределах одной итерации. Указание :static
является ошибочным, если используется из другого цикла @threads
или потока, отличного от 1.
Планирование |
Пример
Чтобы продемонстрировать различные стратегии планирования, можно рассмотреть следующую функцию busywait
, которая содержит нерезультативный цикл заданной продолжительности, выполняемый в течение определенного количества секунд.
julia> function busywait(seconds)
tstart = time_ns()
while (time_ns() - tstart) / 1e9 < seconds
end
end
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :static for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
В примере :dynamic
задан период продолжительностью 2 секунды, поскольку один из незанятых потоков может выполнять две 1-секундные итерации, чтобы завершить цикл.
#
Base.Threads.foreach
— Function
Threads.foreach(f, channel::Channel;
schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
ntasks=Threads.threadpoolsize())
Аналогично foreach(f, channel)
, но итерация выполняется по channel
, и вызовы к f
разделены по заданиям ntasks
, порожденным Threads.@spawn
. Эта функция будет ожидать завершения внутренне порождаемых заданий до возврата.
Если schedule isa FairSchedule
, то Threads.foreach
попытается породить задания таким способом, который позволит планировщику Julia более свободно балансировать нагрузку для рабочих операций в различных потоках. Такой подход в целом предполагает более высокие накладные расходы на операцию, но обеспечивает более высокую эффективность по сравнению с StaticSchedule
в параллельном режиме при наличии других многопотоковых рабочих нагрузок.
Если schedule isa StaticSchedule
, то Threads.foreach
порождает задания таким способом, который предполагает более низкие накладные расходы на операцию, чем FairSchedule
, но менее пригоден для балансировки нагрузки. Следовательно, такой подход может лучше подходить для более точных, однородных рабочих нагрузок, но при этом будет менее эффективным, чем FairSchedule
, в параллельном режиме при наличии других многопотоковых рабочих нагрузок.
Примеры
julia> n = 20
julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)
julia> d = Channel{Int}(n) do ch
f = i -> put!(ch, i^2)
Threads.foreach(f, c)
end
julia> collect(d)
collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
Совместимость: Julia 1.6
Для этой функции требуется версия не ниже Julia 1.6. |
#
Base.Threads.@spawn
— Macro
Threads.@spawn [:default|:interactive] expr
Создает Task
и schedule
его, чтобы запустить в любом доступном потоке в указанном пуле потоков (:default
, если не указано). Задача выделяется в поток после того, как станет доступной. Чтобы дождаться завершения задачи, вызовите wait
в результате этого макроса или вызовите fetch
, чтобы дождаться завершения и затем получить возвращаемое значение.
Значения можно интерполировать в @spawn
с помощью $
, который копирует значение непосредственно в сконструированное базовое закрытие. Это позволит вставлять значение переменной, изолируя асинхронный код от изменений значения переменной в текущей задаче.
Поток, в котором выполняется задача, может измениться, если задача отдает управление, поэтому |
Совместимость: Julia 1.3
Этот макрос впервые реализован в Julia 1.3. |
Совместимость: Julia 1.4
Интерполяция значений с помощью |
Совместимость: Julia 1.9
Пул потоков можно указывать начиная с версии Julia 1.9. |
#
Base.Threads.threadid
— Function
Threads.threadid() -> Int
Получает идентификационный номер текущего потока выполнения. Главный поток имеет идентификатор 1
.
Примеры
julia> Threads.threadid()
1
julia> Threads.@threads for i in 1:4
println(Threads.threadid())
end
4
2
5
4
Поток, в котором выполняется задача, может измениться, если задача отдает управление. Это называется |
#
Base.Threads.maxthreadid
— Function
Threads.maxthreadid() -> Int
Получает нижнюю границу числа доступных потоков (по всем пулам потоков), доступных процессу Julia, с семантикой atomic-acquire. Результат всегда будет больше или равен threadid()
, а также threadid(task)
для любой задачи, которую вы смогли наблюдать до вызова maxthreadid
.
#
Base.Threads.nthreads
— Function
Threads.nthreads(:default | :interactive) -> Int
Получает текущее количество потоков в указанном пуле потоков. Потоки в пуле по умолчанию имеют идентификационные номера 1:nthreads(:default)
.
См. также описание BLAS.get_num_threads
и BLAS.set_num_threads
в стандартной библиотеке LinearAlgebra
и nprocs()
в стандартной библиотеке Distributed
и Threads.maxthreadid()
.
#
Base.Threads.threadpool
— Function
Threads.threadpool(tid = threadid()) -> Symbol
Возвращает пул потоков указанного потока — либо :default
, либо :interactive
.
#
Base.Threads.nthreadpools
— Function
Threads.nthreadpools() -> Int
Возвращает количество текущих настроенных пулов потоков.
#
Base.Threads.threadpoolsize
— Function
Threads.threadpoolsize(pool::Symbol = :default) -> Int
Получает количество потоков, доступных пулу потоков по умолчанию (или указанному пулу потоков).
См. также описание BLAS.get_num_threads
и BLAS.set_num_threads
в стандартной библиотеке LinearAlgebra
и nprocs()
в стандартной библиотеке Distributed
.
См. также раздел Многопоточность.
Атомарные операции
#
Base.@atomic
— Macro
@atomic var
@atomic order ex
var
или ex
помечается как выполняемый атомарно, если ex
представляет собой поддерживаемое выражение. Если значение order
не указано, по умолчанию используется :sequentially_consistent.
@atomic a.b.x = new @atomic a.b.x += addend @atomic :release a.b.x = new @atomic :acquire_release a.b.x += addend
Выполняет операцию хранения, атомарно выраженную в правой части, и возвращает новое значение.
При использовании =
эта операция преобразуется в вызов setproperty!(a.b, :x, new)
. При использовании любого оператора эта операция преобразуется в вызов modifyproperty!(a.b, :x, +, addend)[2]
.
@atomic a.b.x max arg2 @atomic a.b.x + arg2 @atomic max(a.b.x, arg2) @atomic :acquire_release max(a.b.x, arg2) @atomic :acquire_release a.b.x + arg2 @atomic :acquire_release a.b.x max arg2
Выполняет двоичную операцию, атомарно выраженную в правой части. Сохраняет результат в поле в первом аргументе и возвращает значения (old, new)
.
Эта операция преобразуется в вызов modifyproperty!(a.b, :x, func, arg2)
.
Дополнительные сведения см. в разделе Атомарные операции в каждом поле руководства.
Примеры
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomic a.x # получает поле х со значением а, с последовательной согласованностью
1
julia> @atomic :sequentially_consistent a.x = 2 # задает поле х со значением а, с последовательной согласованностью
2
julia> @atomic a.x += 1 # пошагово увеличивает поле х со значением а, с последовательной согласованностью
3
julia> @atomic a.x + 1 # пошагово увеличивает поле х со значением а, с последовательной согласованностью
3 => 4
julia> @atomic a.x # получает поле х со значением а, с последовательной согласованностью
4
julia> @atomic max(a.x, 10) # изменяет поле х со значением а на максимальное значение, с последовательной согласованностью
4 => 10
julia> @atomic a.x max 5 # снова изменяет поле х со значением а на максимальное значение, с последовательной согласованностью
10 => 10
Совместимость: Julia 1.7
Для этой функции требуется версия не ниже Julia 1.7. |
#
Base.@atomicswap
— Macro
@atomicswap a.b.x = new
@atomicswap :sequentially_consistent a.b.x = new
Сохраняет new
в a.b.x
и возвращает прежнее значение a.b.x
.
Эта операция преобразуется в вызов swapproperty!(a.b, :x, new)
.
Дополнительные сведения см. в разделе Атомарные операции в каждом поле руководства.
Примеры
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomicswap a.x = 2+2 # заменяет поле х со значением а на значение 4, с последовательной согласованностью
1
julia> @atomic a.x # получает поле х со значением а, с последовательной согласованностью
4
Совместимость: Julia 1.7
Для этой функции требуется версия не ниже Julia 1.7. |
#
Base.@atomicreplace
— Macro
@atomicreplace a.b.x expected => desired
@atomicreplace :sequentially_consistent a.b.x expected => desired
@atomicreplace :sequentially_consistent :monotonic a.b.x expected => desired
Выполняет условную замену, атомарно выраженную парой, возвращая значения (old, success::Bool)
. Где success
указывает, выполнялась ли замена.
Эта операция преобразуется в вызов replaceproperty!(a.b, :x, expected, desired)
.
Дополнительные сведения см. в разделе Атомарные операции в каждом поле руководства.
Примеры
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomicreplace a.x 1 => 2 # заменяет поле х со значением а на значение 2 (если оно было равно 1), с последовательной согласованностью
(old = 1, success = true)
julia> @atomic a.x # получает поле х со значением а, с последовательной согласованностью
2
julia> @atomicreplace a.x 1 => 2 # заменяет поле х со значением а на значение 2 (если оно было равно 1), с последовательной согласованностью
(old = 2, success = false)
julia> xchg = 2 => 0; # заменяет поле х со значением а на значение 0 (если оно было равно 2), с последовательной согласованностью
julia> @atomicreplace a.x xchg
(old = 2, success = true)
julia> @atomic a.x # получает поле х со значением а, с последовательной согласованностью
0
Совместимость: Julia 1.7
Для этой функции требуется версия не ниже Julia 1.7. |
Следующие интерфейсы API достаточно примитивны и, скорее всего, будут доступны через оболочку наподобие |
Core.Intrinsics.atomic_pointerref(pointer::Ptr{T}, order::Symbol) --> T Core.Intrinsics.atomic_pointerset(pointer::Ptr{T}, new::T, order::Symbol) --> pointer Core.Intrinsics.atomic_pointerswap(pointer::Ptr{T}, new::T, order::Symbol) --> old Core.Intrinsics.atomic_pointermodify(pointer::Ptr{T}, function::(old::T,arg::S)->T, arg::S, order::Symbol) --> old Core.Intrinsics.atomic_pointerreplace(pointer::Ptr{T}, expected::Any, new::T, success_order::Symbol, failure_order::Symbol) --> (old, cmp)
Следующие интерфейсы API устарели, хотя, скорее всего, будут поддерживаться еще в нескольких выпусках. |
#
Base.Threads.Atomic
— Type
Threads.Atomic{T}
Содержит ссылку на объект типа T
, разрешая только мгновенный доступ, то есть потокобезопасным способом.
Только определенные «простые» типы можно использовать атомарно, в частности примитивный логический тип, целочисленный тип и тип с плавающей запятой. Это Bool
, Int8
…Int128
, UInt8
…UInt128
и Float16
…Float64
.
Новые атомарные объекты можно создавать на основе неатомарных значений; если они не заданы, атомарный объект инициализируется с нулевым значением.
Доступ к атомарным объектам можно осуществлять с помощью нотации []
:
Примеры
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> x[] = 1
1
julia> x[]
1
Атомарные операции используют префикс atomic_
, такой как atomic_add!
, atomic_xchg!
и т. д.
#
Base.Threads.atomic_cas!
— Function
Threads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T
Атомарно выполняет операцию сравнения с обменом x
Атомарно сравнивает значение в x
с cmp
. Если они равны, выполняется запись newval
в x
. В ином случае x
остается без изменений. Возвращает прежнее значение в x
. Сравнивая возвращаемое значение с cmp
(посредством ===
), можно узнать, изменялся ли x
и содержит ли он теперь новое значение newval
.
Дополнительные сведения см. в инструкции LLVM cmpxchg
.
Эту функцию можно использовать для реализации транзакционной семантики. До транзакции выполняется запись значения в x
. После выполнения транзакции новое значение сохраняется только в том случае, если x
не изменен за этот период.
Примеры
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 4, 2);
julia> x
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 3, 2);
julia> x
Base.Threads.Atomic{Int64}(2)
#
Base.Threads.atomic_xchg!
— Function
Threads.atomic_xchg!(x::Atomic{T}, newval::T) where T
Атомарно изменяет значение в x
Атомарно изменяет значение в x
на newval
. Возвращает прежнее значение.
Дополнительные сведения см. в инструкции LLVM atomicrmw xchg
.
Примеры
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_xchg!(x, 2)
3
julia> x[]
2
#
Base.Threads.atomic_add!
— Function
Threads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes
Атомарно добавляет val
в x
Выполняет x[] += val
атомарным образом. Возвращает прежнее значение. Не определено для Atomic{Bool}
.
Дополнительные сведения см. в инструкции LLVM atomicrmw add
.
Примеры
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_add!(x, 2)
3
julia> x[]
5
#
Base.Threads.atomic_sub!
— Function
Threads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes
Атомарно вычитает val
из x
Выполняет x[] -= val
атомарным образом. Возвращает прежнее значение. Не определено для Atomic{Bool}
.
Дополнительные сведения см. в инструкции LLVM atomicrmw sub
.
Примеры
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_sub!(x, 2)
3
julia> x[]
1
#
Base.Threads.atomic_and!
— Function
Threads.atomic_and!(x::Atomic{T}, val::T) where T
Атомарно выполняет битовую операцию and x
с val
.
Выполняет x[] &= val
атомарным образом. Возвращает прежнее значение.
Дополнительные сведения см. в инструкции LLVM atomicrmw and
.
Примеры
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_and!(x, 2)
3
julia> x[]
2
#
Base.Threads.atomic_nand!
— Function
Threads.atomic_nand!(x::Atomic{T}, val::T) where T
Атомарно выполняет битовую операцию nand (not-and) x
с val
Выполняет x[] = ~(x[] & val)
атомарным образом. Возвращает прежнее значение.
Дополнительные сведения см. в инструкции LLVM atomicrmw nand
.
Примеры
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_nand!(x, 2)
3
julia> x[]
-3
#
Base.Threads.atomic_or!
— Function
Threads.atomic_or!(x::Atomic{T}, val::T) where T
Атомарно выполняет битовую операцию or x
с val
.
Выполняет x[] |= val
атомарным образом. Возвращает прежнее значение.
Дополнительные сведения см. в инструкции LLVM atomicrmw or
.
Примеры
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_or!(x, 7)
5
julia> x[]
7
#
Base.Threads.atomic_xor!
— Function
Threads.atomic_xor!(x::Atomic{T}, val::T) where T
Атомарно выполняет битовую операцию xor (exclusive-or) x
с val
Выполняет x[] $= val
атомарным образом. Возвращает прежнее значение.
Дополнительные сведения см. в инструкции LLVM atomicrmw xor
.
Примеры
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_xor!(x, 7)
5
julia> x[]
2
#
Base.Threads.atomic_max!
— Function
Threads.atomic_max!(x::Atomic{T}, val::T) where T
Атомарно сохраняет максимальное значение x
и val
в x
Выполняет x[] = max(x[], val)
атомарным образом. Возвращает прежнее значение.
Дополнительные сведения см. в инструкции LLVM atomicrmw max
.
Примеры
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_max!(x, 7)
5
julia> x[]
7
#
Base.Threads.atomic_min!
— Function
Threads.atomic_min!(x::Atomic{T}, val::T) where T
Атомарно сохраняет минимальное значение x
и val
в x
Выполняет x[] = min(x[], val)
атомарным образом. Возвращает прежнее значение.
Дополнительные сведения см. в инструкции LLVM atomicrmw min
.
Примеры
julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)
julia> Threads.atomic_min!(x, 5)
7
julia> x[]
5
#
Base.Threads.atomic_fence
— Function
Threads.atomic_fence()
Вставляет последовательно согласованный барьер памяти
Вставляет барьер памяти с последовательно согласованной семантикой упорядочивания. При необходимости доступны алгоритмы, то есть в тех случаях, когда упорядочивание получения/выпуска является недостаточным.
Это, скорее всего, весьма дорогая операция. Учитывая, что все остальные атомарные операции в Julia уже располагают семантикой получения/выпуска, явные барьеры в большинстве случаев не требуются.
Дополнительные сведения см. в инструкции LLVM fence
.
Вызов ccall с использованием пула потоков libuv (экспериментальная функция)
#
Base.@threadcall
— Macro
@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)
Макрос @threadcall
вызывается так же, как и ccall
, но он выполняет свои задачи в другом потоке. Рекомендуется использовать, если требуется вызвать блокирующую функцию С без блокирования текущего потока julia
. Многопоточный режим ограничивается размером пула потока libuv, который по умолчанию включает 4 потока, но этот лимит можно увеличить, задав переменную среды UV_THREADPOOL_SIZE
и перезапустив процесс julia
.
Обратите внимание, что вызванная функция не должна выполнять обратный вызов к Julia.
Примитивы низкоуровневой синхронизации
Эти стандартные блоки используются для создания объектов регулярной синхронизации.
#
Base.Threads.SpinLock
— Type
SpinLock()
Создает нереентерабельную активную блокировку «проверить — проверить — установить». Рекурсивное использование приводит к взаимоблокировке. Такой тип блокировки можно использовать только для кода, выполнение которого не отнимает много времени и который не блокируется (например, при выполнении ввода-вывода). В целом вместо нее можно использовать ReentrantLock
.
Каждый lock
должен сопоставляться с unlock
. Если !islocked(lck::SpinLock)
удерживается, trylock(lck)
выполняется успешно, если только нет других задач, пытающихся удержать блокировку в то же самое время.
Активные блокировки «проверить — проверить — установить» являются самыми быстрыми при наличии примерно 30 конкурирующих потоков. Если конкурирующих потоков больше, следует выбрать другие методы синхронизации.