Документация Engee

Многопоточность

Презентация функций многопоточности Julia приведена в этом посте блога.

Запуск Julia с несколькими потоками

По умолчанию Julia запускается с одним потоком выполнения. Это можно проверить с помощью команды Threads.nthreads().

julia> Threads.nthreads()
1

Число потоков управления контролируется либо аргументом командной сроки -t/--threads, либо переменной среды JULIA_NUM_THREADS. Когда указано и то и другое, приоритет будет у -t/--threads.

Число потоков может быть задано либо целым числом (--threads=4), либо в виде auto (--threads=auto), где auto пытается вывести полезное заданное по умолчанию число потоков для использования (дополнительные сведения см. на странице Параметры командной строки).

Совместимость: Julia 1.5

Для аргумента командной строки -t/--threads требуется версия не ниже Julia 1.5. В более старых версиях необходимо использовать переменную среды.

Совместимость: Julia 1.7

Чтобы в качестве значения переменной среды JULIA_NUM_THREADS использовать auto, требуется версия не ниже Julia 1.7. В более старых версиях это значение игнорируется. Давайте запустим Julia с четырьмя потоками.

$ julia --threads 4

Проверим, что нам доступны четыре потока.

julia> Threads.nthreads()
4

Однако сейчас мы находимся в главном потоке. Чтобы проверить это, воспользуемся функцией Threads.threadid.

julia> Threads.threadid()
1

Если вы предпочитаете использовать переменную среды, ее можно задать следующим образом в Bash (Linux/macOS).

export JULIA_NUM_THREADS=4

В оболочке C на Linux/macOS или в CMD на Windows:

set JULIA_NUM_THREADS=4

В Powershell на Windows:

$env:JULIA_NUM_THREADS=4

Учтите, что это необходимо сделать перед запуском Julia.

Число потоков, задаваемое -t/--threads, распространяется на рабочие процессы, создаваемые с помощью параметров командной строки -p/--procs или --machine-file. Например, julia -p2 -t2 создает один основной процесс и два рабочих, и для всех трех процессов включено два потока. Для более точного контроля над рабочими потоками используйте addprocs и передавайте -t/--threads в качестве флагов выполнения (exeflags).

Пулы потоков

Когда потоки программы заняты выполнением множества задач, задачи могут выполняться с задержками, что может негативно сказаться на оперативности реагирования и интерактивности программы. Чтобы решить эту проблему, можно указать, что задача является интерактивной, когда вы планируете (Threads.@spawn) ее выполнить:

using Base.Threads
@spawn :interactive f()

В интерактивных задачах следует избегать выполнения операций с высокой задержкой, а если это длительные задачи, они должны выходить часто.

Julia можно запустить с одним или несколькими потоками, зарезервированными для выполнения интерактивных задач:

$ julia --threads 3,1

Аналогичным образом можно использовать и переменную среды JULIA_NUM_THREADS:

export JULIA_NUM_THREADS=3,1

При этом Julia запускается с 3 потоками в пуле потоков :default и 1 потоком в пуле потоков :interactive:

julia> using Base.Threads

julia> nthreads()
4

julia> nthreadpools()
2

julia> threadpool()
:default

julia> nthreads(:interactive)
1

Любое число или их оба можно заменить словом auto, в результате чего Julia выберет разумное значение по умолчанию.

Взаимодействие и синхронизация

Хотя потоки Julia могут взаимодействовать через общую память, как известно, написать корректный многопоточный код, свободный от гонки по данным, довольно сложно. Каналы (Channel) в Julia являются потокобезопасными и могут использоваться для безопасного взаимодействия.

Устранение гонки по данным

Вы несете полную ответственность за то, чтобы в вашей программе не возникала гонка по данным. Если не соблюдать это требование, ничто из обещанного здесь не гарантируется. Наблюдаемые результаты могут быть совершенно непредсказуемыми.

Для соблюдения этого требования лучше всего получать блокировку при любом доступе к данным, которые видимы для множества потоков. Например, в большинстве случаев следует использовать следующий шаблон кода.

julia> lock(lk) do
           use(a)
       end

julia> begin
           lock(lk)
           try
               use(a)
           finally
               unlock(lk)
           end
       end

Здесь lk — это блокировка (например, ReentrantLock()), а a — данные.

Кроме того, при наличии гонки по данным Julia не обеспечивает безопасность по памяти. Будьте очень осторожны с чтением любых данных, если возможна их запись в другом потоке! При изменении данных (например, присвоении значения переменной в глобальной области или в замыкании), которые доступны другим потокам, всегда используйте шаблон блокировки, приведенный выше.

Thread 1:
global b = false
global a = rand()
global b = true

Thread 2:
while !b; end
bad_read1(a) # Доступ к `a` здесь НЕ безопасен!

Thread 3:
while !@isdefined(a); end
bad_read2(a) # Доступ к `a` здесь НЕ безопасен.

Макрос @threads

Рассмотрим простой пример на основе наших собственных потоков. Создадим массив нулей.

julia> a = zeros(10)
10-element Vector{Float64}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

Мы будем работать с этим массивом, используя четыре потока одновременно. Каждый поток будет записывать свой идентификатор в каждой позиции.

Julia поддерживает параллельные циклы на основе макроса Threads.@threads. Этот макрос ставится перед циклом for и указывает Julia, что цикл является многопоточным регионом.

julia> Threads.@threads for i = 1:10
           a[i] = Threads.threadid()
       end

Область итерации распределяется между потоками, а затем каждый поток записывает свой идентификатор в назначенные позиции.

julia> a
10-element Vector{Float64}:
 1.0
 1.0
 1.0
 2.0
 2.0
 2.0
 3.0
 3.0
 4.0
 4.0

Обратите внимание, что у Threads.@threads отсутствует необязательный параметр редукции, такой как @distributed.

Использование @threads без гонки по данным

Рассмотрим пример обычного суммирования.

julia> function sum_single(a)
           s = 0
           for i in a
               s += i
           end
           s
       end
sum_single (generic function with 1 method)

julia> sum_single(1:1_000_000)
500000500000

Простое добавление @threads приводит к гонке по данным с одновременным чтением и записью s несколькими потоками.

julia> function sum_multi_bad(a)
           s = 0
           Threads.@threads for i in a
               s += i
           end
           s
       end
sum_multi_bad (generic function with 1 method)

julia> sum_multi_bad(1:1_000_000)
70140554652

Обратите внимание, что результат не равен 500000500000, как это должно быть, и, скорее всего, будет меняться при каждом вычислении.

Для решения этой проблемы можно использовать буферы, специфичные для конкретной задачи, чтобы разделить сумму на фрагменты без гонки. Здесь sum_single используется повторно с собственным внутренним буфером s, а вектор a разбивается на фрагменты nthreads() для параллельной работы с помощью nthreads() запланированных (@spawn) задач.

julia> function sum_multi_good(a)
           chunks = Iterators.partition(a, length(a) ÷ Threads.nthreads())
           tasks = map(chunks) do chunk
               Threads.@spawn sum_single(chunk)
           end
           chunk_sums = fetch.(tasks)
           return sum_single(chunk_sums)
       end
sum_multi_good (generic function with 1 method)

julia> sum_multi_good(1:1_000_000)
500000500000

Буферами не следует управлять на основе threadid(), то есть buffers = zeros(Threads.nthreads()), поскольку могут выводиться одновременные задачи , то есть несколько одновременных задач могут использовать один и тот же буфер в одном потоке, что создает риск возникновения гонок по данным. Кроме того, при наличии нескольких потоков задачи могут менять поток в точках вывода. Это называется миграцией задач.

Другим вариантом является использование атомарных операций с переменными, общими для задач или потоков, что может быть более производительным в зависимости от характеристик операций.

Атомарные операции

Julia поддерживает доступ к значениям и их изменение атомарным образом, то есть с обеспечением потокобезопасности и предотвращением состояния гонки. Чтобы обеспечить такой доступ к значению, можно поместить его в оболочку Threads.Atomic (причем значение должно быть примитивного типа). См. следующий пример.

julia> i = Threads.Atomic{Int}(0);

julia> ids = zeros(4);

julia> old_is = zeros(4);

julia> Threads.@threads for id in 1:4
           old_is[id] = Threads.atomic_add!(i, id)
           ids[id] = id
       end

julia> old_is
4-element Vector{Float64}:
 0.0
 1.0
 7.0
 3.0

julia> i[]
 10

julia> ids
4-element Vector{Float64}:
 1.0
 2.0
 3.0
 4.0

Если бы мы попытались выполнить сложение без метки атомарности, ответ мог быть неверным из-за состояния гонки. Пример того, что произошло бы без исключения гонки:

julia> using Base.Threads

julia> Threads.nthreads()
4

julia> acc = Ref(0)
Base.RefValue{Int64}(0)

julia> @threads for i in 1:1000
          acc[] += 1
       end

julia> acc[]
926

julia> acc = Atomic{Int64}(0)
Atomic{Int64}(0)

julia> @threads for i in 1:1000
          atomic_add!(acc, 1)
       end

julia> acc[]
1000

Атомарные операции в каждом поле

Атомарные операции также можно использовать на более детальном уровне с помощью макросов @atomic, @atomicswap и @atomicreplace.

Подробные сведения о модели памяти и других аспектах их дизайна приведены в Манифесте атомарных операций Julia, который будет официально опубликован позже.

Метку @atomic можно использовать для любого поля в объявлении структуры, и тогда каждая запись также должна быть помечена @atomic и должна использовать один из определенных порядков атомарных операций (:monotonic, :acquire, :release, :acquire_release или :sequentially_consistent). Для любого чтения атомарного поля также можно указать ограничение порядка атомарных операций. Если оно не указано, используется нестрогий монотонный порядок (monotonic).

Совместимость: Julia 1.7

Для атомарных операций в каждом поле требуется версия не ниже Julia 1.7.

Побочные эффекты и изменяемые аргументы функции

При многопоточности следует с осторожностью использовать функции, не являющиеся чистыми, так как можно получить некорректный ответ. Например, функции, имеющие имя с окончанием !, стандартно модифицируют свои аргументы и потому не являются чистыми.

@threadcall

Внешние библиотеки, например вызываемые через ccall, создают проблемы для используемого в Julia механизма ввода-вывода на основе задач. Если библиотека C выполняет блокирующую операцию, планировщик Julia не может выполнять никакие другие задачи, пока вызов не будет завершен. (Исключением является вызов пользовательского кода C с обратным вызовом Julia, который может отдать управление, либо кода C с вызовом функции jl_yield(), являющейся эквивалентом yield на языке C.)

Макрос @threadcall позволяет избежать приостановки выполнения в подобных ситуациях. Он назначает выполнение функции C в отдельном потоке. Для этого используется пул размером в четыре потока по умолчанию. Размер пула потоков контролируется переменной среды UV_THREADPOOL_SIZE. При использовании этого макроса, когда запрашивающая задача (в главном цикле событий Julia) ожидает свободный поток или выполняет функцию в доступном потоке, она отдает управление другим задачам. При этом @threadcall не завершает работу, пока не завершится выполнение. Поэтому с точки зрения пользователя этот макрос является блокирующим вызовом, как и другие API Julia.

Крайне важно, чтобы вызванная функция не осуществляла обратный вызов Julia, так как это приведет к аварийному завершению программы.

Возможно, в будущих версиях Julia макрос @threadcall будет удален или изменен.

Предупреждения

В настоящее время большинство операций в среде выполнения Julia и стандартных библиотеках обеспечивают потокобезопасность при условии, что пользовательский код не содержит гонки по данным. Однако в некоторых областях поддержка потоков пока является нестабильной. Многопоточное программирование неизбежно сопряжено с множеством сложностей, поэтому, если использующая потоки программа ведет себя необычным или нежелательным образом (например, аварийно завершает работу или выдает неожиданные результаты), подозрение в первую очередь должно падать на взаимодействия между потоками.

При использовании потоков в Julia необходимо учитывать ряд определенных ограничений и рисков.

  • При использовании типа коллекции из Base во множестве потоков одновременно, если хотя бы один поток изменяет коллекцию (частые примеры: push! для массивов или вставка элементов в Dict), необходимо использовать ручную блокировку.

  • Назначение выполнения в @spawn является недетерминированным, и полагаться на него не следует.

  • Использование задач, опирающихся на вычислительные ресурсы без выделения памяти, может предотвратить запуск сборки мусора в других потоках, где память выделяется. В таких случаях может быть необходимо вручную вставить вызов GC.safepoint(), чтобы обеспечить ее запуск. В будущем данное ограничение будет устранено.

  • Старайтесь не использовать параллельно операции верхнего уровня, такие как include или eval, для определений типов, методов и модулей.

  • Учтите, что включение потоков может нарушить работу финализаторов, регистрируемых библиотекой. Чтобы свободно использовать для них потоки, может быть необходимо дополнительное усовершенствование экосистемы. Подробности см. в следующем разделе.

Миграция задач

После того как задача начала выполняться в определенном потоке, она может переместиться в другой поток, если выдает значение.

Такие задачи могут запускаться с помощью @spawn или @threads, хотя параметр расписания :static для @threads замораживает ИД потока.

Это означает, что в большинстве случаев threadid() не следует рассматривать как константу внутри задачи, а значит, не следует использовать для индексирования в вектор буферов или объектов с отслеживанием состояния.

Совместимость: Julia 1.7

Миграция задач появилась в версии Julia 1.7. До этого задачи всегда оставались в том же потоке, в котором они были запущены.

Безопасное применение финализаторов

Поскольку финализаторы могут прерывать выполнение любого кода, нужно соблюдать особую осторожность при их взаимодействии с какими-либо глобальными состояниями. К сожалению, изменения глобального состояния — это основная причина их использования (использовать в качестве финализатора чистую функцию, как правило, не имеет смысла). Поэтому возникает непростая ситуация. Существует несколько подходов к решению этой проблемы.

  1. В однопоточном режиме можно вызвать в коде внутреннюю функцию jl_gc_enable_finalizers языка C, чтобы запуск финализаторов не назначался в критически важном регионе. Это используется в ряде внутренних функций (например, в наших блокировках C) для предотвращения рекурсии при выполнении некоторых операций (инкрементной загрузки пакетов, генерации кода и пр.). Сочетание блокировки и данного флага позволяет сделать финализаторы безопасными.

  2. Другая стратегия, реализованная в некоторых элементах модуля Base, состоит в явной задержке запуска финализатора до тех пор, пока он не сможет получить блокировку без рекурсии. В следующем примере показано применение этой стратегии для Distributed.finalize_ref.

     function finalize_ref(r::AbstractRemoteRef)
         if r.where > 0 # Проверяет, не запущен ли уже финализатор
             if islocked(client_refs) || !trylock(client_refs)
                 # Задержка запуска финализатора при отсутствии свободной блокировки
                 finalizer(finalize_ref, r)
                 return nothing
             end
             try # После `lock` всегда должна следовать `try`
                 if r.where > 0 # Здесь нужна повторная проверка
                     # Здесь должна быть фактическая очистка
                     r.where = 0
                 end
             finally
                 unlock(client_refs)
             end
         end
         nothing
     end
  3. Связанная третья стратегия — использовать очередь без отдачи управления. В модуле Base пока не реализована очередь без блокировки, однако можно использовать Base.IntrusiveLinkedListSynchronized{T}. Эта стратегия зачастую хорошо подходит для кода с циклами событий. Например, она используется в Gtk.jl для управления подсчетом ссылок за время существования. Данный подход не подразумевает каких-либо явных действий внутри финализатора (finalizer); мы просто добавляем его в очередь для запуска в более безопасное время. На самом деле планировщик задач Julia уже это реализует, поэтому определение финализатора в форме x -> @spawn do_cleanup(x) является одним из примеров данного подхода. Однако учтите, что эта реализация не контролирует, в каком потоке запускается do_cleanup, поэтому для do_cleanup все равно необходимо получить блокировку. Но если вы реализуете собственную очередь, это может не потребоваться, так как вы можете явно очистить свою очередь из собственного потока.