Multithreading
#
Base.Threads.@threads
— Macro
Threads.@threads [schedule] for ... end
A macro that executes the for
loop in parallel mode. The iteration space is distributed among large-scale tasks. This policy can be set using the schedule
argument. The execution of the loop waits for evaluation of all iterations.
See also the description @spawn
and pmap' in `Distributed
.
Advanced Help
Semantics
Except in cases where stronger guarantees are defined by the schedule setting parameter, the loop executed by the @threads
macro has the following semantics.
The macro @threads
executes the body of the loop in an unspecified order and possibly in multithreaded mode. It does not specify the exact assignment assignments and flows of the work role. Assignments may differ for each execution. The code of the loop body (including all code transitively invoked from it) should not make assumptions about the distribution of iterations across tasks or work streams in which they are executed. The body of the loop for each iteration should be able to evolve progressively independently of the rest of the iterations, and data race should be eliminated. Thus, invalid synchronizations in different iterations can deadlock, and unsynchronized memory access sessions can lead to undefined behavior.
For example, the above conditions assume the following:
-
The lock set in an iteration must be lifted within the same iteration.
-
Interaction between iterations using primitives such as `Channel' is incorrect.
-
Writes are performed only to locations that are not shared by different iterations (except when a lock or atomic operation is used).
-
If the schedule
:static
is not used, the valuethreadid()
can change even within a single iteration. See the sectionTask Migration
.
Planners
Without the scheduler argument, the exact creation of the schedule cannot be configured, and it may vary in different releases of Julia. Currently, :dynamic
is used if no scheduler is specified.
Compatibility: Julia 1.5
The |
:dynamic
(default)
The scheduler :dynamic
executes iterations dynamically for available work threads. The current implementation assumes that the workload for each iteration is uniform. However, this assumption may cease to apply in the future.
This scheduling parameter is just a hint for the basic execution mechanism. However, a number of properties can be expected. The number of Tasks
used by the scheduler :dynamic
is limited to a small constant value equal to the number of available worker threads (Threads.threadpoolsize()
). Each task processes continuous regions of the iteration space. Thus, @threads :dynamic for x in xs; f(x); end
is generally more efficient than @sync for x in xs; @spawn f(x); end' if `length(xs)`significantly exceeds the number of worker threads and the execution time `f(x)
is relatively less than the cost of job generation and synchronization (usually less than 10 milliseconds).
Compatibility: Julia 1.8
The |
:greedy
The Planner :greedy
generates up to Threads.threadpoolsize()
tasks, each of which processes as many set iterable values as possible as they are created. As soon as the task completes, it takes the next value from the iterator. Any single task does not necessarily work with adjacent iterator values. The iterator can output values forever; only the iterator interface is required (without indexing).
This planning option is usually well suited if the workload of individual iterations is uneven or has a large spread.
Compatibility: Julia 1.11
Value |
:static
The :static
scheduler creates one task per thread and divides the iterations equally between them, assigning each specific task separately to each thread. In particular, the value of threadid()
is guaranteed to be constant within a single iteration. Specifying :static
is erroneous if it is used from another loop `@threads' or a thread other than 1.
|
Examples
To demonstrate different planning strategies, we can consider the following busywait
function, which contains an unsuccessful loop of a given duration, executed for a certain number of seconds.
julia> function busywait(seconds)
tstart = time_ns()
while (time_ns() - tstart) / 1e9 < seconds
end
end
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :static for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
In the example :dynamic
, a period of 2 seconds is set, since one of the idle threads can perform two 1-second iterations to complete the cycle.
#
Base.Threads.foreach
— Function
Threads.foreach(f, channel::Channel;
schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
ntasks=Threads.threadpoolsize())
Similar to foreach(f, channel)
, but iteration over channel
and calls to f
are separated by ntasks
tasks generated by Threads.@spawn
. This function will wait for internally generated tasks to complete before returning control.
If schedule isa FairSchedule
, then Threads.foreach
will try to generate tasks in a way that allows the Julia scheduler to more freely balance the load for work operations in different threads. This approach generally assumes higher overhead per operation, but provides higher efficiency compared to StaticSchedule
in parallel mode when other multithreaded workloads are present.
If schedule isa StaticSchedule
, then Threads.foreach generates jobs in a way that assumes lower operational overhead than FairSchedule, but is less suitable for load balancing. Therefore, this approach may be better suited for more precise, homogeneous workloads, but it will be less efficient than `FairSchedule
in parallel mode with other multithreaded workloads.
Examples
julia> n = 20
julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)
julia> d = Channel{Int}(n) do ch
f = i -> put!(ch, i^2)
Threads.foreach(f, c)
end
julia> collect(d)
collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
Compatibility: Julia 1.6
This feature requires a Julia version of at least 1.6. |
#
Base.Threads.@spawn
— Macro
Threads.@spawn [:default|:interactive] expr
Creates Task
and applies to it schedule
for execution on any available thread in the specified thread pool (:default' if not specified). The task is allocated to the stream after it becomes available. To wait for the task to complete, call `wait
for the result of this macro, or call fetch
to wait for completion and then get the return value.
Values can be interpolated into @spawn
using $
, which copies the value directly into the constructed base closure. This allows you to insert the _value of a variable, isolating asynchronous code from changes in the value of the variable in the current task.
The thread in which the task is running may change if the task completes, so |
Compatibility: Julia 1.3
This macro was first implemented in Julia 1.3. |
Compatibility: Julia 1.4
Interpolation of values using |
Compatibility: Julia 1.9
The thread pool can be specified starting from version Julia 1.9. |
Examples
julia> t() = println("Hello from ", Threads.threadid());
julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
Hello from 1
Hello from 1
Hello from 3
Hello from 4
#
Base.Threads.threadid
— Function
Threads.threadid() -> Int
Gets the identification number of the current execution thread. The main stream has the identifier `1'.
Examples
julia> Threads.threadid()
1
julia> Threads.@threads for i in 1:4
println(Threads.threadid())
end
4
2
5
4
The thread in which the task is running may change if the task completes, which is called |
#
Base.Threads.maxthreadid
— Function
Threads.maxthreadid() -> Int
Gets a lower bound on the number of available threads (in all thread pools) available to the Julia process, with atomic-acquire semantics. The result will always be greater than or equal to threadid()
, as well as threadid(task)
for any task that you might have observed before calling maxthreadid
.
#
Base.Threads.nthreads
— Function
Threads.nthreads(:default | :interactive) -> Int
Gets the current number of threads in the specified thread pool. The streams in :interactive
have the identification numbers 1:nthreads(:interactive)
, and the streams in :default
are in the range nthreads(:interactive).+ (1:nthreads(:default))
.
See also the description of BLAS.get_num_threads
and BLAS.set_num_threads
in the standard library LinearAlgebra'
and `nprocs() in the standard library Distributed
and Threads.maxthreadid()
.
#
Base.Threads.threadpool
— Function
Threads.threadpool(tid = threadid()) -> Symbol
Returns the pool of streams of the specified stream: :default
, :interactive
or :foreign
.
#
Base.Threads.nthreadpools
— Function
Threads.nthreadpools() -> Int
Returns the number of currently configured thread pools.
#
Base.Threads.threadpoolsize
— Function
Threads.threadpoolsize(pool::Symbol = :default) -> Int
Gets the number of threads available to the default thread pool (or the specified thread pool).
See also the description of BLAS.get_num_threads
and BLAS.set_num_threads
in the standard library LinearAlgebra'
and `nprocs() in the standard library Distributed
.
#
Base.Threads.ngcthreads
— Function
Threads.ngcthreads() -> Int
Returns the number of currently configured garbage collector threads. This includes both labeling streams and parallel cleaning streams.
See also the section Multithreading.
Atomic operations
#
atomic
— Keyword
Unsafe pointer operations are compatible with loading and saving pointers declared with type _Atomic
and std::atomic
in C11 and C++23, respectively. If atomic loading of the Julia T
type is not supported, an error may occur.
See also the description unsafe_load
, unsafe_modify!
, unsafe_replace!
, unsafe_store!
, unsafe_swap!
#
Base.@atomic
— Macro
@atomic var
@atomic order ex
Marks var
or ex
as executed atomically if ex
is a supported expression. If the order
value is not specified, sequentially_consistent is used by default.
@atomic a.b.x = new @atomic a.b.x += addend @atomic :release a.b.x = new @atomic :acquire_release a.b.x += addend
Performs the storage operation atomically expressed in the right-hand side, and returns a new value.
When using =`
this operation is converted into a call to setproperty!(a.b, :x, new)'. When using any operator, this operation is converted into a call to `modifyproperty!(a.b, :x, +, addend)[2]
.
@atomic a.b.x max arg2 @atomic a.b.x + arg2 @atomic max(a.b.x, arg2) @atomic :acquire_release max(a.b.x, arg2) @atomic :acquire_release a.b.x + arg2 @atomic :acquire_release a.b.x max arg2
Performs a binary operation atomically expressed in the right-hand side. Saves the result in the field in the first argument and returns the values (old, new)
.
This operation is converted into a call to `modifyproperty!(a.b, :x, func, arg2)'.
For more information, see Atomic operations in each field guidelines.
Examples
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
1
julia> @atomic :sequentially_consistent a.x = 2 # задает поле х объекта а с последовательной согласованностью
2
julia> @atomic a.x += 1 # пошагово увеличивает поле х объекта а с последовательной согласованностью
3
julia> @atomic a.x + 1 # пошагово увеличивает поле х объекта а с последовательной согласованностью
3 => 4
julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
4
julia> @atomic max(a.x, 10) # changes the field x of object a to the maximum value with consistent consistency
4 => 10
julia > @atomic a.x max 5 # changes the field x of object a to the maximum value again with consistent consistency
10 => 10
Compatibility: Julia 1.7
This feature requires a version of Julia at least 1.7. |
#
Base.@atomicswap
— Macro
@atomicswap a.b.x = new
@atomicswap :sequentially_consistent a.b.x = new
Saves new
to a.b.x
and returns the previous value of `a.b.x'.
This operation is converted into a call to `swapproperty!(a.b, :x, new)'.
For more information, see Atomic operations in each field guidelines.
Examples
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomicswap a.x = 2+2 # заменяет поле х объекта а на значение 4 с последовательной согласованностью
1
julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
4
Compatibility: Julia 1.7
This feature requires a version of Julia at least 1.7. |
#
Base.@atomicreplace
— Macro
@atomicreplace a.b.x expected => desired
@atomicreplace :sequentially_consistent a.b.x expected => desired
@atomicreplace :sequentially_consistent :monotonic a.b.x expected => desired
Performs a conditional substitution atomically expressed by a pair, returning the values (old, success::Bool)
. Where success
indicates whether the replacement was performed.
This operation is converted into a call to `replaceproperty!(a.b, :x, expected, desired)'.
For more information, see Atomic operations in each field guidelines.
Examples
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomicreplace a.x 1 => 2 # заменяет поле х объекта а на значение 2, если оно было равно 1, с последовательной согласованностью
(old = 1, success = true)
julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
2
julia> @atomicreplace a.x 1 => 2 # заменяет поле х объекта а на значение 2, если оно было равно 1, с последовательной согласованностью
(old = 2, success = false)
julia> xchg = 2 => 0; # заменяет поле х объекта а на значение 0, если оно было равно 2, с последовательной согласованностью
julia> @atomicreplace a.x xchg
(old = 2, success = true)
julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
0
Compatibility: Julia 1.7
This feature requires a version of Julia at least 1.7. |
#
Base.@atomiconce
— Macro
@atomiconce a.b.x = value
@atomiconce :sequentially_consistent a.b.x = value
@atomiconce :sequentially_consistent :monotonic a.b.x = value
Performs conditional assignment of a value atomically if it has not been previously set, returning the value success::Bool
. success
indicates whether the assignment has been completed.
This operation is converted into a call to `setpropertyonce!(a.b, :x, value)'.
For more information, see Atomic operations in each field guidelines.
Examples
julia> mutable struct AtomicOnce
@atomic x
AtomicOnce() = new()
end
julia> a = AtomicOnce()
AtomicOnce(#undef)
julia> @atomiconce a.x = 1 # присваивает полю х объекта а значение 1, если оно не было задано, с последовательной согласованностью
true
julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
1
julia> @atomiconce a.x = 1 # присваивает полю х объекта а значение 1, если оно не было задано, с последовательной согласованностью
false
Compatibility: Julia 1.11
This feature requires a version of Julia not lower than 1.11. |
#
Core.AtomicMemory
— Type
AtomicMemory{T} == GenericMemory{:atomic, T, Core.CPU}
Vector DenseVector{T}
of a fixed size. Access to any of its elements is carried out atomically (with the ordering :monotonic
). Any element must be specified using the macro @atomic
with explicit ordering.
Each element is independently atomic when accessed and cannot be specified non-atomically. Currently, the macro |
For more information, see Atomic operations.
Compatibility: Julia 1.11
This type requires a Julia version not lower than 1.11. |
For an unsafe set of functions, there are also optional memory ordering options that select C/C compatible ones.++ versions of these atomic operations, if specified for this parameter unsafe_load
, unsafe_store!
, unsafe_swap!
, unsafe_replace!
and unsafe_modify!
.
The following APIs have been deprecated, although they are likely to be supported in several more releases. |
#
Base.Threads.Atomic
— Type
Threads.Atomic{T}
It contains a reference to an object of type T
, allowing only atomic access, that is, in a thread-safe way.
Only certain "simple" types can be used atomically, in particular a primitive boolean type, an integer type, and a floating-point type. These include Bool
, Int8'…`Int128
, UInt8
…UInt128
and Float16'…`Float64
.
New atomic objects can be created based on non-atomic values; if they are not specified, the atomic object is initialized with a null value.
Atomic objects can be accessed using the notation []
:
Examples
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> x[] = 1
1
julia> x[]
1
Atomic operations use the prefix atomic_
, such as atomic_add!
, atomic_xchg!
and so on.
#
Base.Threads.atomic_cas!
— Function
Threads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T
Atomically performs a comparison operation with the exchange of `x'.
Atomically compares the value in x
with cmp'. If they are equal, `newval
is written to x'. Otherwise, the `x
remains unchanged. Returns the previous value of x
. By comparing the returned value with cmp
(using ===
), you can find out if x
has changed and if it now contains the new value `newval'.
For more information, see the LLVM cmpxchg
instructions.
This function can be used to implement transactional semantics. Before the transaction, the value is written to x
. After the transaction is completed, the new value is saved only if x
has not been changed during this period.
Examples
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 4, 2);
julia> x
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 3, 2);
julia> x
Base.Threads.Atomic{Int64}(2)
#
Base.Threads.atomic_xchg!
— Function
Threads.atomic_xchg!(x::Atomic{T}, newval::T) where T
Atomically replaces the value in x
.
Atomically replaces the value in x
with `newval'. Returns the previous value.
For more information, see the LLVM manual `atomicrmw xchg'.
Examples
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_xchg!(x, 2)
3
julia> x[]
2
#
Base.Threads.atomic_add!
— Function
Threads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes
Atomically adds val
to x
Executes x[] += val
in an atomic way. Returns the previous value. Not defined for Atomic{Bool}
.
For more information, see the LLVM instructions atomicrmw add
.
Examples
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_add!(x, 2)
3
julia> x[]
5
#
Base.Threads.atomic_sub!
— Function
Threads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes
Atomically subtracts val
from x
Executes x[] -= val
in an atomic way. Returns the previous value. Not defined for Atomic{Bool}
.
For more information, see the LLVM atomicrmw sub
instructions.
Examples
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_sub!(x, 2)
3
julia> x[]
1
#
Base.Threads.atomic_and!
— Function
Threads.atomic_and!(x::Atomic{T}, val::T) where T
Atomically performs the bit operation "and" with x
and val
.
Executes x[] &= val
in an atomic way. Returns the previous value.
For more information, see the LLVM atomicrmw and
instructions.
Examples
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_and!(x, 2)
3
julia> x[]
2
#
Base.Threads.atomic_nand!
— Function
Threads.atomic_nand!(x::Atomic{T}, val::T) where T
Atomically performs the bit operation "and not" with x
and val
Executes `x[] = ~(x[] & val)`in an atomic way. Returns the previous value.
For more information, see the LLVM atomicrmw nand
instructions.
Examples
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_nand!(x, 2)
3
julia> x[]
-3
#
Base.Threads.atomic_or!
— Function
Threads.atomic_or!(x::Atomic{T}, val::T) where T
Atomically performs the bitwise operation "or" with x
and `val'.
Executes x[] |= val
in an atomic way. Returns the previous value.
For more information, see the LLVM atomicrmw or
instructions.
Examples
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_or!(x, 7)
5
julia> x[]
7
#
Base.Threads.atomic_xor!
— Function
Threads.atomic_xor!(x::Atomic{T}, val::T) where T
Atomically performs the bitwise operation of excluding "or" with x
and val
Executes x[] $= val
in an atomic way. Returns the previous value.
For more information, see the LLVM atomicrmw xor
instructions.
Examples
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_xor!(x, 7)
5
julia> x[]
2
#
Base.Threads.atomic_max!
— Function
Threads.atomic_max!(x::Atomic{T}, val::T) where T
Atomically stores the maximum of the values of x
and val
in x
Executes `x[] = max(x[], val)`in an atomic way. Returns the previous value.
For more information, see the LLVM atomicrmw max
instructions.
Examples
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_max!(x, 7)
5
julia> x[]
7
#
Base.Threads.atomic_min!
— Function
Threads.atomic_min!(x::Atomic{T}, val::T) where T
Atomically stores the minimum of the values of x
and val
in x
Executes `x[] = min(x[], val)`in an atomic way. Returns the previous value.
For more information, see the LLVM atomicrmw min
instructions.
Examples
julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)
julia> Threads.atomic_min!(x, 5)
7
julia> x[]
5
#
Base.Threads.atomic_fence
— Function
Threads.atomic_fence()
Inserts a sequentially consistent memory barrier.
Inserts a memory barrier with consistently consistent ordering semantics. If necessary, algorithms are available, that is, in cases where the ordering of receipt/release is insufficient.
This is most likely a very expensive operation. Given that all other atomic operations in Julia already have receive/release semantics, explicit barriers are not required in most cases.
For more information, see the LLVM fence
instructions.
Calling ccall using libuv thread pool (experimental feature)
#
Base.@threadcall
— Macro
@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)
The macro @threadcall
is called in the same way as 'ccall`, but performs its tasks in a different thread. It is recommended to use it if you want to call a blocking function from without blocking the current julia
thread. Multithreaded mode is limited by the size of the libuv thread pool, which includes 4 threads by default, but this limit can be increased by setting the environment variable UV_THREADPOOL_SIZE
and restarting the julia
process.
Note that the called function should not make a callback to Julia.
Low-level synchronization primitives
These building blocks are used to create regular synchronization objects.
#
Base.Threads.SpinLock
— Type
SpinLock()
Creates an unreadable active lock "check — check — install". Recursive usage leads to deadlock. This type of lock can only be used for code that does not take much time to execute and that is not blocked (for example, when performing I/O). As a rule, you can use instead ReentrantLock
.
Each lock
must be accompanied by unlock
. If !islocked(lck::SpinLock)
is being held, trylock(lck)
succeeds unless there are other tasks trying to hold the lock at the same time.
Active check-check-install locks are fastest when there are approximately 30 competing threads. If there are more competing threads, you should choose other synchronization methods.