Документация Engee

Многопоточность

Threads.@threads [schedule] for ... end

Макрос, исполняющий цикл for в параллельном режиме. Пространство итерации распределяется среди крупномодульных задач. Эту политику можно задать аргументом schedule. Исполнение цикла ожидает оценки всех итераций.

См. также описание @spawn и pmap в Distributed.

Расширенная справка

Семантика

За исключением случаев, когда более твердые гарантии определены параметром настройки расписания, цикл, исполняемый макросом @threads, имеет следующую семантику.

Макрос @threads исполняет тело цикла в неустановленном порядке и, возможно, в многопоточном режиме. Он не задает точные присваивания заданий и потоки рабочей роли. Для каждого исполнения присваивания могут отличаться. Код тела цикла (включая весь код, транзитивно вызываемый из него) не должен делать допущения относительно распределения итераций по заданиям или потока рабочей роли, в котором они исполняются. Тело цикла для каждой итерации должно иметь возможность развиваться прогрессивно независимо от остальных итераций, и гонка по данным должна быть исключена. Таким образом, недопустимые синхронизации в различных итерациях могут взаимоблокироваться, а несинхронизированные сеансы доступа к памяти могут привести к неопределенному поведению.

Например, приведенные выше условия предполагают следующее.

  • Блокировка, установленная в итерации, должна быть снята в пределах той же итерации.

  • Взаимодействие между итерациями с использованием примитивов, таких как Channel, является некорректным.

  • Запись выполняется только в расположения, которые не используются совместно различными итерациями (за исключением случаев, когда используется блокировка или атомарная операция).

  • Если не используется расписание :static, значение threadid() может изменяться даже в пределах одной итерации. См. описание Task Migration.

Планировщики

Без аргумента планировщика точное создание расписания настроить нельзя, и оно может варьироваться в различных выпусках Julia. В настоящее время используется :dynamic, если планировщик не задан.

Совместимость: Julia 1.5

Аргумент schedule впервые реализован в Julia 1.5.

:dynamic (по умолчанию)

Планировщик :dynamic исполняет итерации динамически для доступных потоков рабочей роли. Текущая реализация предполагает, что рабочая нагрузка для каждой итерации является однородной. Однако такое допущение можно устранить в будущем.

Этот параметр планирования представляет собой просто подсказку для базового механизма исполнения. Однако при этом можно ожидать ряда свойства. Количество Task, используемых планировщиком :dynamic, привязано к небольшой постоянной величине, равной количеству доступных потоков рабочей роли (Threads.threadpoolsize()). Каждое задание обрабатывает сплошные регионы пространства итерации. Таким образом, @threads :dynamic for x in xs; f(x); end, как правило, более эффективен, чем @sync for x in xs; @spawn f(x); end, если length(xs) значительно превышает количество потоков рабочей роли и время выполнения f(x) относительно меньше стоимости порождения и синхронизации задания (как правило, менее 10 миллисекунд).

Совместимость: Julia 1.8

Доступен параметр :dynamic аргумента schedule, который используется по умолчанию, начиная с версии Julia 1.8.

:static

Планировщик :static создает одно задание на поток и разделяет итерации поровну между ними, присваивая каждое конкретное задание отдельно каждому потоку. В частности, значение threadid() гарантированно будет константой в пределах одной итерации. Указание :static является ошибочным, если используется из другого цикла @threads или потока, отличного от 1.

Планирование :static предусматривается для поддержки перехода кода, написанного в версиях до Julia 1.3. В написанных новых функциях библиотеки планирование :static не рекомендуется, поскольку функции, использующие этот параметр, нельзя вызвать из произвольных потоков рабочей роли.

Пример

Чтобы продемонстрировать различные стратегии планирования, можно рассмотреть следующую функцию busywait, которая содержит нерезультативный цикл заданной продолжительности, выполняемый в течение определенного количества секунд.

julia> function busywait(seconds)
            tstart = time_ns()
            while (time_ns() - tstart) / 1e9 < seconds
            end
        end

julia> @time begin
            Threads.@spawn busywait(5)
            Threads.@threads :static for i in 1:Threads.threadpoolsize()
                busywait(1)
            end
        end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)

julia> @time begin
            Threads.@spawn busywait(5)
            Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
                busywait(1)
            end
        end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)

В примере :dynamic задан период продолжительностью 2 секунды, поскольку один из незанятых потоков может выполнять две 1-секундные итерации, чтобы завершить цикл.

Threads.foreach(f, channel::Channel;
                schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
                ntasks=Threads.threadpoolsize())

Аналогично foreach(f, channel), но итерация выполняется по channel, и вызовы к f разделены по заданиям ntasks, порожденным Threads.@spawn. Эта функция будет ожидать завершения внутренне порождаемых заданий до возврата.

Если schedule isa FairSchedule, то Threads.foreach попытается породить задания таким способом, который позволит планировщику Julia более свободно балансировать нагрузку для рабочих операций в различных потоках. Такой подход в целом предполагает более высокие накладные расходы на операцию, но обеспечивает более высокую эффективность по сравнению с StaticSchedule в параллельном режиме при наличии других многопотоковых рабочих нагрузок.

Если schedule isa StaticSchedule, то Threads.foreach порождает задания таким способом, который предполагает более низкие накладные расходы на операцию, чем FairSchedule, но менее пригоден для балансировки нагрузки. Следовательно, такой подход может лучше подходить для более точных, однородных рабочих нагрузок, но при этом будет менее эффективным, чем FairSchedule, в параллельном режиме при наличии других многопотоковых рабочих нагрузок.

Примеры

julia> n = 20

julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)

julia> d = Channel{Int}(n) do ch
           f = i -> put!(ch, i^2)
           Threads.foreach(f, c)
       end

julia> collect(d)
collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
Совместимость: Julia 1.6

Для этой функции требуется версия не ниже Julia 1.6.

Threads.@spawn [:default|:interactive] expr

Создает Task и schedule его, чтобы запустить в любом доступном потоке в указанном пуле потоков (:default, если не указано). Задача выделяется в поток после того, как станет доступной. Чтобы дождаться завершения задачи, вызовите wait в результате этого макроса или вызовите fetch, чтобы дождаться завершения и затем получить возвращаемое значение.

Значения можно интерполировать в @spawn с помощью $, который копирует значение непосредственно в сконструированное базовое закрытие. Это позволит вставлять значение переменной, изолируя асинхронный код от изменений значения переменной в текущей задаче.

Поток, в котором выполняется задача, может измениться, если задача отдает управление, поэтому threadid() не следует расценивать как константу для задачи. Другие важные особенности см. в разделе Task Migration и более общем руководстве по многопоточности. См. также главу, посвященную пулам потоков.

Совместимость: Julia 1.3

Этот макрос впервые реализован в Julia 1.3.

Совместимость: Julia 1.4

Интерполяция значений с помощью $ доступна начиная с Julia версии 1.4.

Совместимость: Julia 1.9

Пул потоков можно указывать начиная с версии Julia 1.9.

Threads.threadid() -> Int

Получает идентификационный номер текущего потока выполнения. Главный поток имеет идентификатор 1.

Примеры

julia> Threads.threadid()
1

julia> Threads.@threads for i in 1:4
          println(Threads.threadid())
       end
4
2
5
4

Поток, в котором выполняется задача, может измениться, если задача отдает управление. Это называется Task Migration. По этой причине в большинстве случаев небезопасно использовать threadid() для обращения по индексам, например, к элементам вектора-буфера или вектора объектов с сохранением состояния.

Threads.maxthreadid() -> Int

Получает нижнюю границу числа доступных потоков (по всем пулам потоков), доступных процессу Julia, с семантикой atomic-acquire. Результат всегда будет больше или равен threadid(), а также threadid(task) для любой задачи, которую вы смогли наблюдать до вызова maxthreadid.

Threads.nthreads(:default | :interactive) -> Int

Получает текущее количество потоков в указанном пуле потоков. Потоки в пуле по умолчанию имеют идентификационные номера 1:nthreads(:default).

См. также описание BLAS.get_num_threads и BLAS.set_num_threads в стандартной библиотеке LinearAlgebra и nprocs() в стандартной библиотеке Distributed и Threads.maxthreadid().

Threads.threadpool(tid = threadid()) -> Symbol

Возвращает пул потоков указанного потока — либо :default, либо :interactive.

Threads.nthreadpools() -> Int

Возвращает количество текущих настроенных пулов потоков.

Threads.threadpoolsize(pool::Symbol = :default) -> Int

Получает количество потоков, доступных пулу потоков по умолчанию (или указанному пулу потоков).

См. также описание BLAS.get_num_threads и BLAS.set_num_threads в стандартной библиотеке LinearAlgebra и nprocs() в стандартной библиотеке Distributed.

См. также раздел Многопоточность.

Атомарные операции

@atomic var
@atomic order ex

var или ex помечается как выполняемый атомарно, если ex представляет собой поддерживаемое выражение. Если значение order не указано, по умолчанию используется :sequentially_consistent.

@atomic a.b.x = new
@atomic a.b.x += addend
@atomic :release a.b.x = new
@atomic :acquire_release a.b.x += addend

Выполняет операцию хранения, атомарно выраженную в правой части, и возвращает новое значение.

При использовании = эта операция преобразуется в вызов setproperty!(a.b, :x, new). При использовании любого оператора эта операция преобразуется в вызов modifyproperty!(a.b, :x, +, addend)[2].

@atomic a.b.x max arg2
@atomic a.b.x + arg2
@atomic max(a.b.x, arg2)
@atomic :acquire_release max(a.b.x, arg2)
@atomic :acquire_release a.b.x + arg2
@atomic :acquire_release a.b.x max arg2

Выполняет двоичную операцию, атомарно выраженную в правой части. Сохраняет результат в поле в первом аргументе и возвращает значения (old, new).

Эта операция преобразуется в вызов modifyproperty!(a.b, :x, func, arg2).

Дополнительные сведения см. в разделе Атомарные операции в каждом поле руководства.

Примеры

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomic a.x # получает поле х со значением а, с последовательной согласованностью
1

julia> @atomic :sequentially_consistent a.x = 2 # задает поле х со значением а, с последовательной согласованностью
2

julia> @atomic a.x += 1 # пошагово увеличивает поле х со значением а, с последовательной согласованностью
3

julia> @atomic a.x + 1 # пошагово увеличивает поле х со значением а, с последовательной согласованностью
3 => 4

julia> @atomic a.x # получает поле х со значением а, с последовательной согласованностью
4

julia> @atomic max(a.x, 10) # изменяет поле х со значением а на максимальное значение, с последовательной согласованностью
4 => 10

julia> @atomic a.x max 5 # снова изменяет поле х со значением а на максимальное значение, с последовательной согласованностью
10 => 10
Совместимость: Julia 1.7

Для этой функции требуется версия не ниже Julia 1.7.

@atomicswap a.b.x = new
@atomicswap :sequentially_consistent a.b.x = new

Сохраняет new в a.b.x и возвращает прежнее значение a.b.x.

Эта операция преобразуется в вызов swapproperty!(a.b, :x, new).

Дополнительные сведения см. в разделе Атомарные операции в каждом поле руководства.

Примеры

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomicswap a.x = 2+2 # заменяет поле х со значением а на значение 4, с последовательной согласованностью
1

julia> @atomic a.x # получает поле х со значением а, с последовательной согласованностью
4
Совместимость: Julia 1.7

Для этой функции требуется версия не ниже Julia 1.7.

@atomicreplace a.b.x expected => desired
@atomicreplace :sequentially_consistent a.b.x expected => desired
@atomicreplace :sequentially_consistent :monotonic a.b.x expected => desired

Выполняет условную замену, атомарно выраженную парой, возвращая значения (old, success::Bool). Где success указывает, выполнялась ли замена.

Эта операция преобразуется в вызов replaceproperty!(a.b, :x, expected, desired).

Дополнительные сведения см. в разделе Атомарные операции в каждом поле руководства.

Примеры

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomicreplace a.x 1 => 2 # заменяет поле х со значением а на значение 2 (если оно было равно 1), с последовательной согласованностью
(old = 1, success = true)

julia> @atomic a.x # получает поле х со значением а, с последовательной согласованностью
2

julia> @atomicreplace a.x 1 => 2 # заменяет поле х со значением а на значение 2 (если оно было равно 1), с последовательной согласованностью
(old = 2, success = false)

julia> xchg = 2 => 0; # заменяет поле х со значением а на значение 0 (если оно было равно 2), с последовательной согласованностью

julia> @atomicreplace a.x xchg
(old = 2, success = true)

julia> @atomic a.x # получает поле х со значением а, с последовательной согласованностью
0
Совместимость: Julia 1.7

Для этой функции требуется версия не ниже Julia 1.7.

Следующие интерфейсы API достаточно примитивны и, скорее всего, будут доступны через оболочку наподобие unsafe_*.

Core.Intrinsics.atomic_pointerref(pointer::Ptr{T}, order::Symbol) --> T
Core.Intrinsics.atomic_pointerset(pointer::Ptr{T}, new::T, order::Symbol) --> pointer
Core.Intrinsics.atomic_pointerswap(pointer::Ptr{T}, new::T, order::Symbol) --> old
Core.Intrinsics.atomic_pointermodify(pointer::Ptr{T}, function::(old::T,arg::S)->T, arg::S, order::Symbol) --> old
Core.Intrinsics.atomic_pointerreplace(pointer::Ptr{T}, expected::Any, new::T, success_order::Symbol, failure_order::Symbol) --> (old, cmp)

Следующие интерфейсы API устарели, хотя, скорее всего, будут поддерживаться еще в нескольких выпусках.

Threads.Atomic{T}

Содержит ссылку на объект типа T, разрешая только мгновенный доступ, то есть потокобезопасным способом.

Только определенные «простые» типы можно использовать атомарно, в частности примитивный логический тип, целочисленный тип и тип с плавающей запятой. Это Bool, Int8…​Int128, UInt8…​UInt128 и Float16…​Float64.

Новые атомарные объекты можно создавать на основе неатомарных значений; если они не заданы, атомарный объект инициализируется с нулевым значением.

Доступ к атомарным объектам можно осуществлять с помощью нотации []:

Примеры

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> x[] = 1
1

julia> x[]
1

Атомарные операции используют префикс atomic_, такой как atomic_add!, atomic_xchg! и т. д.

Threads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T

Атомарно выполняет операцию сравнения с обменом x

Атомарно сравнивает значение в x с cmp. Если они равны, выполняется запись newval в x. В ином случае x остается без изменений. Возвращает прежнее значение в x. Сравнивая возвращаемое значение с cmp (посредством ===), можно узнать, изменялся ли x и содержит ли он теперь новое значение newval.

Дополнительные сведения см. в инструкции LLVM cmpxchg.

Эту функцию можно использовать для реализации транзакционной семантики. До транзакции выполняется запись значения в x. После выполнения транзакции новое значение сохраняется только в том случае, если x не изменен за этот период.

Примеры

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 4, 2);

julia> x
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 3, 2);

julia> x
Base.Threads.Atomic{Int64}(2)
Threads.atomic_xchg!(x::Atomic{T}, newval::T) where T

Атомарно изменяет значение в x

Атомарно изменяет значение в x на newval. Возвращает прежнее значение.

Дополнительные сведения см. в инструкции LLVM atomicrmw xchg.

Примеры

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_xchg!(x, 2)
3

julia> x[]
2
Threads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

Атомарно добавляет val в x

Выполняет x[] += val атомарным образом. Возвращает прежнее значение. Не определено для Atomic{Bool}.

Дополнительные сведения см. в инструкции LLVM atomicrmw add.

Примеры

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_add!(x, 2)
3

julia> x[]
5
Threads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

Атомарно вычитает val из x

Выполняет x[] -= val атомарным образом. Возвращает прежнее значение. Не определено для Atomic{Bool}.

Дополнительные сведения см. в инструкции LLVM atomicrmw sub.

Примеры

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_sub!(x, 2)
3

julia> x[]
1
Threads.atomic_and!(x::Atomic{T}, val::T) where T

Атомарно выполняет битовую операцию and x с val.

Выполняет x[] &= val атомарным образом. Возвращает прежнее значение.

Дополнительные сведения см. в инструкции LLVM atomicrmw and.

Примеры

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_and!(x, 2)
3

julia> x[]
2
Threads.atomic_nand!(x::Atomic{T}, val::T) where T

Атомарно выполняет битовую операцию nand (not-and) x с val

Выполняет x[] = ~(x[] & val) атомарным образом. Возвращает прежнее значение.

Дополнительные сведения см. в инструкции LLVM atomicrmw nand.

Примеры

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_nand!(x, 2)
3

julia> x[]
-3
Threads.atomic_or!(x::Atomic{T}, val::T) where T

Атомарно выполняет битовую операцию or x с val.

Выполняет x[] |= val атомарным образом. Возвращает прежнее значение.

Дополнительные сведения см. в инструкции LLVM atomicrmw or.

Примеры

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_or!(x, 7)
5

julia> x[]
7
Threads.atomic_xor!(x::Atomic{T}, val::T) where T

Атомарно выполняет битовую операцию xor (exclusive-or) x с val

Выполняет x[] $= val атомарным образом. Возвращает прежнее значение.

Дополнительные сведения см. в инструкции LLVM atomicrmw xor.

Примеры

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_xor!(x, 7)
5

julia> x[]
2
Threads.atomic_max!(x::Atomic{T}, val::T) where T

Атомарно сохраняет максимальное значение x и val в x

Выполняет x[] = max(x[], val) атомарным образом. Возвращает прежнее значение.

Дополнительные сведения см. в инструкции LLVM atomicrmw max.

Примеры

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_max!(x, 7)
5

julia> x[]
7
Threads.atomic_min!(x::Atomic{T}, val::T) where T

Атомарно сохраняет минимальное значение x и val в x

Выполняет x[] = min(x[], val) атомарным образом. Возвращает прежнее значение.

Дополнительные сведения см. в инструкции LLVM atomicrmw min.

Примеры

julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)

julia> Threads.atomic_min!(x, 5)
7

julia> x[]
5
Threads.atomic_fence()

Вставляет последовательно согласованный барьер памяти

Вставляет барьер памяти с последовательно согласованной семантикой упорядочивания. При необходимости доступны алгоритмы, то есть в тех случаях, когда упорядочивание получения/выпуска является недостаточным.

Это, скорее всего, весьма дорогая операция. Учитывая, что все остальные атомарные операции в Julia уже располагают семантикой получения/выпуска, явные барьеры в большинстве случаев не требуются.

Дополнительные сведения см. в инструкции LLVM fence.

Вызов ccall с использованием пула потоков libuv (экспериментальная функция)

@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)

Макрос @threadcall вызывается так же, как и ccall, но он выполняет свои задачи в другом потоке. Рекомендуется использовать, если требуется вызвать блокирующую функцию С без блокирования текущего потока julia. Многопоточный режим ограничивается размером пула потока libuv, который по умолчанию включает 4 потока, но этот лимит можно увеличить, задав переменную среды UV_THREADPOOL_SIZE и перезапустив процесс julia.

Обратите внимание, что вызванная функция не должна выполнять обратный вызов к Julia.

Примитивы низкоуровневой синхронизации

Эти стандартные блоки используются для создания объектов регулярной синхронизации.

SpinLock()

Создает нереентерабельную активную блокировку «проверить — проверить — установить». Рекурсивное использование приводит к взаимоблокировке. Такой тип блокировки можно использовать только для кода, выполнение которого не отнимает много времени и который не блокируется (например, при выполнении ввода-вывода). В целом вместо нее можно использовать ReentrantLock.

Каждый lock должен сопоставляться с unlock. Если !islocked(lck::SpinLock) удерживается, trylock(lck) выполняется успешно, если только нет других задач, пытающихся удержать блокировку в то же самое время.

Активные блокировки «проверить — проверить — установить» являются самыми быстрыми при наличии примерно 30 конкурирующих потоков. Если конкурирующих потоков больше, следует выбрать другие методы синхронизации.