Документация Engee

Многопоточность

Презентация функций многопоточности Julia приведена в этом посте блога.

Запуск Julia с несколькими потоками

По умолчанию Julia запускается с одним потоком выполнения. Это можно проверить с помощью команды Threads.nthreads().

julia> Threads.nthreads()
1

Число потоков управления контролируется либо аргументом командной сроки -t/--threads, либо переменной среды JULIA_NUM_THREADS. Когда указано и то, и другое, приоритет будет у -t/--threads.

Число потоков может быть задано либо целым числом (--threads=4), либо в виде auto (--threads=auto), где auto пытается вывести полезное заданное по умолчанию число потоков для использования (дополнительные сведения см. на странице Параметры командной строки).

Совместимость: Julia 1.5

Для аргумента командной строки -t/--threads требуется версия не ниже Julia 1.5. В более старых версиях необходимо использовать переменную среды.

Совместимость: Julia 1.7

Чтобы в качестве значения переменной среды JULIA_NUM_THREADS использовать auto, требуется версия не ниже Julia 1.7. В более старых версиях это значение игнорируется. Давайте запустим Julia с четырьмя потоками.

$ julia --threads 4

Проверим, что нам доступны четыре потока.

julia> Threads.nthreads()
4

Однако сейчас мы находимся в главном потоке. Чтобы проверить это, воспользуемся функцией Threads.threadid.

julia> Threads.threadid()
1

Если вы предпочитаете использовать переменную среды, ее можно задать следующим образом в Bash (Linux/macOS):

    export JULIA_NUM_THREADS=4
В оболочке C на Linux/macOS или в CMD на Windows:
    set JULIA_NUM_THREADS=4
В Powershell на Windows:
    $env:JULIA_NUM_THREADS=4
Учтите, что это необходимо сделать *перед* запуском Julia.

Число потоков, задаваемое -t/--threads, распространяется на рабочие процессы, создаваемые с помощью параметров командной строки -p/--procs или --machine-file. Например, julia -p2 -t2 создает один основной процесс и два рабочих, и для всех трех процессов включено два потока. Для более точного контроля над рабочими потоками используйте addprocs и передавайте -t/--threads в качестве флагов выполнения (exeflags).

Несколько потоков сборщика мусора

Сборщик мусора может использовать несколько потоков. Используемое количество либо равно половине числа рабочих потоков вычислений, либо задается с помощью аргумента командной строки --gcthreads или переменной среды JULIA_NUM_GC_THREADS.

Совместимость: Julia 1.10

Для аргумента командной строки --gcthreads требуется версия не ниже Julia 1.10.

Пулы потоков

Когда потоки программы заняты выполнением множества задач, задачи могут выполняться с задержками, что может негативно сказаться на оперативности реагирования и интерактивности программы. Чтобы решить эту проблему, можно указать, что задача является интерактивной, когда вы планируете (Threads.@spawn) ее выполнить:

using Base.Threads
@spawn :interactive f()

В интерактивных задачах следует избегать выполнения операций с высокой задержкой, а если это длительные задачи, они должны выходить часто.

Julia можно запустить с одним или несколькими потоками, зарезервированными для выполнения интерактивных задач:

$ julia --threads 3,1

Аналогичным образом можно использовать и переменную среды JULIA_NUM_THREADS:

export JULIA_NUM_THREADS=3,1

При этом Julia запускается с 3 потоками в пуле потоков :default и 1 потоком в пуле потоков :interactive:

julia> using Base.Threads

julia> nthreadpools()
2

julia> threadpool() # главный поток находится в пуле интерактивных потоков
:interactive

julia> nthreads(:default)
3

julia> nthreads(:interactive)
1

julia> nthreads()
3

Версия nthreads без аргументов возвращает количество потоков в пуле по умолчанию.

В зависимости от того, была ли запущена среда Julia с интерактивными потоками, основной поток находится либо в пуле потоков по умолчанию, либо в пуле интерактивных потоков.

Любое число или их оба можно заменить словом auto, в результате чего Julia выберет разумное значение по умолчанию.

Макрос @threads

Рассмотрим простой пример на основе наших собственных потоков. Создадим массив нулей.

julia> a = zeros(10)
10-element Vector{Float64}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

Мы будем работать с этим массивом, используя четыре потока одновременно. Каждый поток будет записывать свой идентификатор в каждой позиции.

Julia поддерживает параллельные циклы на основе макроса Threads.@threads. Этот макрос ставится перед циклом for и указывает Julia, что цикл является многопоточным регионом.

julia> Threads.@threads for i = 1:10
           a[i] = Threads.threadid()
       end

Область итерации распределяется между потоками, а затем каждый поток записывает свой идентификатор в назначенные позиции.

julia> a
10-element Vector{Float64}:
 1.0
 1.0
 1.0
 2.0
 2.0
 2.0
 3.0
 3.0
 4.0
 4.0

Обратите внимание, что у Threads.@threads отсутствует необязательный параметр редукции, такой как @distributed.

Использование @threads без гонки по данным

Понятие гонки по данным подробно рассматривается в разделе Взаимодействие и гонки по данных между потоками. Пока просто знайте, что гонка по данным может привести к неправильным результатам и опасным ошибкам.

Допустим, мы хотим сделать функцию sum_single ниже многопоточной.

julia> function sum_single(a)
           s = 0
           for i in a
               s += i
           end
           s
       end
sum_single (generic function with 1 method)

julia> sum_single(1:1_000_000)
500000500000

Простое добавление @threads приводит к гонке по данным с одновременным чтением и записью s несколькими потоками.

julia> function sum_multi_bad(a)
           s = 0
           Threads.@threads for i in a
               s += i
           end
           s
       end
sum_multi_bad (generic function with 1 method)

julia> sum_multi_bad(1:1_000_000)
70140554652

Обратите внимание, что результат не равен 500000500000, как это должно быть, и, скорее всего, будет меняться при каждом вычислении.

Для решения этой проблемы можно использовать буферы, специфичные для конкретной задачи, чтобы разделить сумму на фрагменты без гонки. Здесь sum_single используется повторно с собственным внутренним буфером s. Входной вектор a разбивается на фрагменты nthreads() для параллельной работы. Затем мы используем Threads.@spawn для создания задач, которые суммируют каждый фрагмент отдельно. Наконец, мы суммируем результаты каждой задачи, снова используя sum_single:

julia> function sum_multi_good(a)
           chunks = Iterators.partition(a, length(a) ÷ Threads.nthreads())
           tasks = map(chunks) do chunk
               Threads.@spawn sum_single(chunk)
           end
           chunk_sums = fetch.(tasks)
           return sum_single(chunk_sums)
       end
sum_multi_good (generic function with 1 method)

julia> sum_multi_good(1:1_000_000)
500000500000

Буферами не следует управлять на основе threadid(), то есть buffers = zeros(Threads.nthreads()), поскольку могут выводиться одновременные задачи, то есть несколько одновременных задач могут использовать один и тот же буфер в одном потоке, что создает риск возникновения гонок по данным. Кроме того, при наличии нескольких потоков задачи могут менять поток в точках вывода. Это называется миграцией задач.

Другим вариантом является использование атомарных операций с переменными, общими для задач или потоков, что может быть более производительным в зависимости от характеристик операций.

Взаимодействие и гонка по данным между потоками

Хотя потоки Julia могут взаимодействовать через общую память, как известно, написать корректный многопоточный код, свободный от гонки по данным, довольно сложно. Каналы (Channel) в Julia являются потокобезопасными и могут использоваться для безопасного взаимодействия. В разделах ниже объясняется, как использовать блокировки и атомарные операции, чтобы избежать гонок по данным.

Устранение гонки по данным

Вы несете полную ответственность за то, чтобы в вашей программе не возникала гонка по данным. Если не соблюдать это требование, ничто из обещанного здесь не гарантируется. Наблюдаемые результаты могут быть совершенно непредсказуемыми.

При наличии гонки по данным Julia не обеспечивает безопасность памяти. Будьте очень осторожны с чтением любых данных, если возможна их запись в другом потоке, так как это может привести к ошибкам сегментации или еще более серьезным проблемам. Ниже приведена пара небезопасных способов доступа к глобальным переменным из разных потоков.

Thread 1:
global b = false
global a = rand()
global b = true

Thread 2:
while !b; end
bad_read1(a) # Доступ к `a` здесь НЕбезопасен!

Thread 3:
while !@isdefined(a); end
bad_read2(a) # Доступ к `a` здесь НЕбезопасен.

Использование блокировок для исключения гонок по данным

Важным инструментом, позволяющим избежать гонок по данным и в результате написать потокобезопасный код, является блокировка. Блокировку можно блокировать и разблокировать. Если поток заблокировал блокировку и не разблокировал ее, говорят, что он «удерживает» ее. Если существует только одна блокировка и мы пишем код, требующий удержания блокировки для доступа к некоторым данным, можно гарантировать, что несколько потоков никогда не будут обращаться к одним и тем же данным одновременно. Примите во внимание, что связь между блокировкой и переменной устанавливает программист, а не программа.

Например, можно создать блокировку my_lock и заблокировать ее на время изменения переменной my_variable. Проще всего это сделать с помощью макроса @lock:

julia> my_lock = ReentrantLock();

julia> my_variable = [1, 2, 3];

julia> @lock my_lock my_variable[1] = 100
100

При использовании аналогичного шаблона с той же блокировкой и переменной, но в другом потоке, операции будут свободны от гонок по данным.

Описанную выше операцию с функциональной версией lock можно выполнить и следующими двумя способами:

julia> lock(my_lock) do
           my_variable[1] = 100
       end
100

julia> begin
           lock(my_lock)
           try
               my_variable[1] = 100
           finally
               unlock(my_lock)
           end
       end
100

Все три варианта равнозначны. Обратите внимание, что для последней версии требуется явный try-блок, чтобы обеспечить постоянную разблокировку блокировки, тогда как первые две версии делают это внутренним образом. При изменении данных (например, присвоении значения переменной в глобальной области или в замыкании), которые доступны другим потокам, всегда следует использовать шаблон блокировки, приведенный выше. Невыполнение этого требования может привести к непредвиденным и серьезным последствиям.

Атомарные операции

Julia поддерживает доступ к значениям и их изменение атомарным образом, то есть с обеспечением потокобезопасности и предотвращением состояния гонки. Чтобы обеспечить такой доступ к значению, можно поместить его в оболочку Threads.Atomic (причем значение должно быть примитивного типа). См. следующий пример.

julia> i = Threads.Atomic{Int}(0);

julia> ids = zeros(4);

julia> old_is = zeros(4);

julia> Threads.@threads for id in 1:4
           old_is[id] = Threads.atomic_add!(i, id)
           ids[id] = id
       end

julia> old_is
4-element Vector{Float64}:
 0.0
 1.0
 7.0
 3.0

julia> i[]
 10

julia> ids
4-element Vector{Float64}:
 1.0
 2.0
 3.0
 4.0

Если бы мы попытались выполнить сложение без метки атомарности, ответ мог быть неверным из-за состояния гонки. Пример того, что произошло бы без исключения гонки:

julia> using Base.Threads

julia> Threads.nthreads()
4

julia> acc = Ref(0)
Base.RefValue{Int64}(0)

julia> @threads for i in 1:1000
          acc[] += 1
       end

julia> acc[]
926

julia> acc = Atomic{Int64}(0)
Atomic{Int64}(0)

julia> @threads for i in 1:1000
          atomic_add!(acc, 1)
       end

julia> acc[]
1000

Атомарные операции в каждом поле

Также можно использовать атомарные операции на более детальном уровне с помощью макросов @atomic, @atomicswap, @atomicreplace и @atomiconce.

Подробные сведения о модели памяти и других аспектах их дизайна приведены в Манифесте атомарных операций Julia, который будет официально опубликован позже.

Метку @atomic можно использовать для любого поля в объявлении структуры, и тогда каждая запись также должна быть помечена @atomic и должна использовать один из определенных порядков атомарных операций (:monotonic, :acquire, :release, :acquire_release или :sequentially_consistent). Для любого чтения атомарного поля также можно указать ограничение порядка атомарных операций. Если оно не указано, используется нестрогий монотонный порядок (monotonic).

Совместимость: Julia 1.7

Для атомарных операций в каждом поле требуется версия не ниже Julia 1.7.

Побочные эффекты и изменяемые аргументы функции

При многопоточности следует с осторожностью использовать функции, не являющиеся чистыми, так как можно получить некорректный ответ. Например, функции, имеющие имя с окончанием !, стандартно модифицируют свои аргументы и потому не являются чистыми.

@threadcall

Внешние библиотеки, например вызываемые через ccall, создают проблемы для используемого в Julia механизма ввода-вывода на основе задач. Если библиотека C выполняет блокирующую операцию, планировщик Julia не может выполнять никакие другие задачи, пока вызов не будет завершен. (Исключением является вызов пользовательского кода C с обратным вызовом Julia, который может отдать управление, либо кода C с вызовом функции jl_yield(), являющейся эквивалентом yield на языке C.)

Макрос @threadcall позволяет избежать приостановки выполнения в подобных ситуациях. Он назначает выполнение функции C в отдельном потоке. Для этого используется пул размером в четыре потока по умолчанию. Размер пула потоков контролируется переменной среды UV_THREADPOOL_SIZE. При использовании этого макроса, когда запрашивающая задача (в главном цикле событий Julia) ожидает свободный поток или выполняет функцию в доступном потоке, она отдает управление другим задачам. При этом @threadcall не завершает работу, пока не завершится выполнение. Поэтому с точки зрения пользователя этот макрос является блокирующим вызовом, как и другие API Julia.

Крайне важно, чтобы вызванная функция не осуществляла обратный вызов Julia, так как это приведет к аварийному завершению программы.

Возможно, в будущих версиях Julia макрос @threadcall будет удален или изменен.

Предупреждения

В настоящее время большинство операций в среде выполнения Julia и стандартных библиотеках обеспечивают потокобезопасность при условии, что пользовательский код не содержит гонки по данным. Однако в некоторых областях поддержка потоков пока является нестабильной. Многопоточное программирование неизбежно сопряжено с множеством сложностей, поэтому, если использующая потоки программа ведет себя необычным или нежелательным образом (например, аварийно завершает работу или выдает неожиданные результаты), подозрение в первую очередь должно падать на взаимодействия между потоками.

При использовании потоков в Julia необходимо учитывать ряд определенных ограничений и рисков.

  • При использовании типа коллекции из Base во множестве потоков одновременно, если хотя бы один поток изменяет коллекцию (частые примеры: push! для массивов или вставка элементов в Dict), необходимо использовать ручную блокировку.

  • Назначение выполнения в @spawn является недетерминированным, и полагаться на него не следует.

  • Использование задач, опирающихся на вычислительные ресурсы без выделения памяти, может предотвратить запуск сборки мусора в других потоках, где память выделяется. В таких случаях может быть необходимо вручную вставить вызов GC.safepoint(), чтобы обеспечить ее запуск. В будущем данное ограничение будет устранено.

  • Старайтесь не использовать параллельно операции верхнего уровня, такие как include или eval, для определений типов, методов и модулей.

  • Учтите, что включение потоков может нарушить работу финализаторов, регистрируемых библиотекой. Чтобы свободно использовать для них потоки, может быть необходимо дополнительное усовершенствование экосистемы. Подробные сведения см. в разделе Безопасное применение финализаторов.

Миграция задач

После того как задача начала выполняться в определенном потоке, она может переместиться в другой поток, если выдает значение.

Такие задачи могут запускаться с помощью @spawn или @threads, хотя параметр расписания :static для @threads замораживает ИД потока.

Это означает, что в большинстве случаев threadid() не следует рассматривать как константу внутри задачи, а значит, не следует использовать для индексирования в вектор буферов или объектов с отслеживанием состояния.

Совместимость: Julia 1.7

Миграция задач появилась в версии Julia 1.7. До этого задачи всегда оставались в том же потоке, в котором они были запущены.

Безопасное применение финализаторов

Поскольку финализаторы могут прерывать выполнение любого кода, нужно соблюдать особую осторожность при их взаимодействии с какими-либо глобальными состояниями. К сожалению, изменения глобального состояния — это основная причина их использования (использовать в качестве финализатора чистую функцию, как правило, не имеет смысла). Поэтому возникает непростая ситуация. Существует несколько подходов к решению этой проблемы.

  1. В однопоточном режиме можно вызвать в коде внутреннюю функцию jl_gc_enable_finalizers языка C, чтобы запуск финализаторов не назначался в критически важном регионе. Это используется в ряде внутренних функций (например, в наших блокировках C) для предотвращения рекурсии при выполнении некоторых операций (инкрементной загрузки пакетов, генерации кода и пр.). Сочетание блокировки и данного флага позволяет сделать финализаторы безопасными.

  2. Другая стратегия, реализованная в некоторых элементах модуля Base, состоит в явной задержке запуска финализатора до тех пор, пока он не сможет получить блокировку без рекурсии. В следующем примере показано применение этой стратегии для Distributed.finalize_ref.

     function finalize_ref(r::AbstractRemoteRef)
     if r.where > 0 # Проверяет, не запущен ли уже финализатор if islocked(client_refs) || !trylock(client_refs) # Задержка запуска финализатора, если невозможно получить блокировку finalizer(finalize_ref, r) return nothing end try # За `lock` всегда должна следовать `try` if r.where > 0 # Здесь нужна повторная проверка # Здесь должна быть фактическая очистка r.where = 0 end finally unlock(client_refs) end end nothing end
3. A related third strategy is to use a yield-free queue. We don't currently
have a lock-free queue implemented in Base, but
`Base.IntrusiveLinkedListSynchronized{T}` is suitable. This can frequently be a
good strategy to use for code with event loops. For example, this strategy is
employed by `Gtk.jl` to manage lifetime ref-counting. In this approach, we
don't do any explicit work inside the `finalizer`, and instead add it to a queue
to run at a safer time. In fact, Julia's task scheduler already uses this, so
defining the finalizer as `x -> @spawn do_cleanup(x)` is one example of this
approach. Note however that this doesn't control which thread `do_cleanup`
runs on, so `do_cleanup` would still need to acquire a lock. That
doesn't need to be true if you implement your own queue, as you can explicitly
only drain that queue from your thread.