Engee documentation

Multithreading

Threads.@threads [schedule] for ... end

A macro that executes the for loop in parallel mode. The iteration space is distributed among large-scale tasks. This policy can be set using the schedule argument. The execution of the loop waits for evaluation of all iterations.

See also the description @spawn and pmap' in `Distributed.

Advanced Help

Semantics

Except in cases where stronger guarantees are defined by the schedule setting parameter, the loop executed by the @threads macro has the following semantics.

The macro @threads executes the body of the loop in an unspecified order and possibly in multithreaded mode. It does not specify the exact assignment assignments and flows of the work role. Assignments may differ for each execution. The code of the loop body (including all code transitively invoked from it) should not make assumptions about the distribution of iterations across tasks or work streams in which they are executed. The body of the loop for each iteration should be able to evolve progressively independently of the rest of the iterations, and data race should be eliminated. Thus, invalid synchronizations in different iterations can deadlock, and unsynchronized memory access sessions can lead to undefined behavior.

For example, the above conditions assume the following:

  • The lock set in an iteration must be lifted within the same iteration.

  • Interaction between iterations using primitives such as `Channel' is incorrect.

  • Writes are performed only to locations that are not shared by different iterations (except when a lock or atomic operation is used).

  • If the schedule :static is not used, the value threadid() can change even within a single iteration. See the section Task Migration.

Planners

Without the scheduler argument, the exact creation of the schedule cannot be configured, and it may vary in different releases of Julia. Currently, :dynamic is used if no scheduler is specified.

Compatibility: Julia 1.5

The schedule argument was first implemented in Julia 1.5.

:dynamic (default)

The scheduler :dynamic executes iterations dynamically for available work threads. The current implementation assumes that the workload for each iteration is uniform. However, this assumption may cease to apply in the future.

This scheduling parameter is just a hint for the basic execution mechanism. However, a number of properties can be expected. The number of Tasks used by the scheduler :dynamic is limited to a small constant value equal to the number of available worker threads (Threads.threadpoolsize()). Each task processes continuous regions of the iteration space. Thus, @threads :dynamic for x in xs; f(x); end is generally more efficient than @sync for x in xs; @spawn f(x); end' if `length(xs)`significantly exceeds the number of worker threads and the execution time `f(x) is relatively less than the cost of job generation and synchronization (usually less than 10 milliseconds).

Compatibility: Julia 1.8

The :dynamic parameter of the schedule argument is available, which has been used by default since Julia 1.8.

:greedy

The Planner :greedy generates up to Threads.threadpoolsize() tasks, each of which processes as many set iterable values as possible as they are created. As soon as the task completes, it takes the next value from the iterator. Any single task does not necessarily work with adjacent iterator values. The iterator can output values forever; only the iterator interface is required (without indexing).

This planning option is usually well suited if the workload of individual iterations is uneven or has a large spread.

Compatibility: Julia 1.11

Value :The greedy `schedule argument was first implemented in Julia 1.11.

:static

The :static scheduler creates one task per thread and divides the iterations equally between them, assigning each specific task separately to each thread. In particular, the value of threadid() is guaranteed to be constant within a single iteration. Specifying :static is erroneous if it is used from another loop `@threads' or a thread other than 1.

:static scheduling is provided to support porting code written in versions prior to Julia 1.3. In the new library functions, :static scheduling is not recommended because functions using this parameter cannot be called from arbitrary work threads.

Examples

To demonstrate different planning strategies, we can consider the following busywait function, which contains an unsuccessful loop of a given duration, executed for a certain number of seconds.

julia> function busywait(seconds)
            tstart = time_ns()
            while (time_ns() - tstart) / 1e9 < seconds
            end
        end

julia> @time begin
            Threads.@spawn busywait(5)
            Threads.@threads :static for i in 1:Threads.threadpoolsize()
                busywait(1)
            end
        end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)

julia> @time begin
            Threads.@spawn busywait(5)
            Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
                busywait(1)
            end
        end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)

In the example :dynamic, a period of 2 seconds is set, since one of the idle threads can perform two 1-second iterations to complete the cycle.

Threads.foreach(f, channel::Channel;
                schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
                ntasks=Threads.threadpoolsize())

Similar to foreach(f, channel), but iteration over channel and calls to f are separated by ntasks tasks generated by Threads.@spawn. This function will wait for internally generated tasks to complete before returning control.

If schedule isa FairSchedule, then Threads.foreach will try to generate tasks in a way that allows the Julia scheduler to more freely balance the load for work operations in different threads. This approach generally assumes higher overhead per operation, but provides higher efficiency compared to StaticSchedule in parallel mode when other multithreaded workloads are present.

If schedule isa StaticSchedule, then Threads.foreach generates jobs in a way that assumes lower operational overhead than FairSchedule, but is less suitable for load balancing. Therefore, this approach may be better suited for more precise, homogeneous workloads, but it will be less efficient than `FairSchedule in parallel mode with other multithreaded workloads.

Examples

julia> n = 20

julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)

julia> d = Channel{Int}(n) do ch
           f = i -> put!(ch, i^2)
           Threads.foreach(f, c)
       end

julia> collect(d)
collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
Compatibility: Julia 1.6

This feature requires a Julia version of at least 1.6.

Threads.@spawn [:default|:interactive] expr

Creates Task and applies to it schedule for execution on any available thread in the specified thread pool (:default' if not specified). The task is allocated to the stream after it becomes available. To wait for the task to complete, call `wait for the result of this macro, or call fetch to wait for completion and then get the return value.

Values can be interpolated into @spawn using $, which copies the value directly into the constructed base closure. This allows you to insert the _value of a variable, isolating asynchronous code from changes in the value of the variable in the current task.

The thread in which the task is running may change if the task completes, so threadid() should not be considered as a constant for the task. For additional important warnings, see Task Migration and in a more extensive multithreading guide. See also the chapter on thread pools.

Compatibility: Julia 1.3

This macro was first implemented in Julia 1.3.

Compatibility: Julia 1.4

Interpolation of values using $ is available starting from Julia 1.4.

Compatibility: Julia 1.9

The thread pool can be specified starting from version Julia 1.9.

Examples

julia> t() = println("Hello from ", Threads.threadid());

julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
Hello from 1
Hello from 1
Hello from 3
Hello from 4
Threads.threadid() -> Int

Gets the identification number of the current execution thread. The main stream has the identifier `1'.

Examples

julia> Threads.threadid()
1

julia> Threads.@threads for i in 1:4
          println(Threads.threadid())
       end
4
2
5
4

The thread in which the task is running may change if the task completes, which is called Task Migration. For this reason, in most cases it is unsafe to use threadid() to index, say, a vector of buffer objects or stateful objects.

Threads.maxthreadid() -> Int

Gets a lower bound on the number of available threads (in all thread pools) available to the Julia process, with atomic-acquire semantics. The result will always be greater than or equal to threadid(), as well as threadid(task) for any task that you might have observed before calling maxthreadid.

Threads.nthreads(:default | :interactive) -> Int

Gets the current number of threads in the specified thread pool. The streams in :interactive have the identification numbers 1:nthreads(:interactive), and the streams in :default are in the range nthreads(:interactive).+ (1:nthreads(:default)).

See also the description of BLAS.get_num_threads and BLAS.set_num_threads in the standard library LinearAlgebra' and `nprocs() in the standard library Distributed and Threads.maxthreadid().

Threads.threadpool(tid = threadid()) -> Symbol

Returns the pool of streams of the specified stream: :default, :interactive or :foreign.

Threads.nthreadpools() -> Int

Returns the number of currently configured thread pools.

Threads.threadpoolsize(pool::Symbol = :default) -> Int

Gets the number of threads available to the default thread pool (or the specified thread pool).

See also the description of BLAS.get_num_threads and BLAS.set_num_threads in the standard library LinearAlgebra' and `nprocs() in the standard library Distributed.

Threads.ngcthreads() -> Int

Returns the number of currently configured garbage collector threads. This includes both labeling streams and parallel cleaning streams.

See also the section Multithreading.

Atomic operations

Unsafe pointer operations are compatible with loading and saving pointers declared with type _Atomic and std::atomic in C11 and C++23, respectively. If atomic loading of the Julia T type is not supported, an error may occur.

@atomic var
@atomic order ex

Marks var or ex as executed atomically if ex is a supported expression. If the order value is not specified, sequentially_consistent is used by default.

@atomic a.b.x = new @atomic a.b.x += addend @atomic :release a.b.x = new @atomic :acquire_release a.b.x += addend

Performs the storage operation atomically expressed in the right-hand side, and returns a new value.

When using =` this operation is converted into a call to setproperty!(a.b, :x, new)'. When using any operator, this operation is converted into a call to `modifyproperty!(a.b, :x, +, addend)[2].

@atomic a.b.x max arg2 @atomic a.b.x + arg2 @atomic max(a.b.x, arg2) @atomic :acquire_release max(a.b.x, arg2) @atomic :acquire_release a.b.x + arg2 @atomic :acquire_release a.b.x max arg2

Performs a binary operation atomically expressed in the right-hand side. Saves the result in the field in the first argument and returns the values (old, new).

This operation is converted into a call to `modifyproperty!(a.b, :x, func, arg2)'.

For more information, see Atomic operations in each field guidelines.

Examples

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
1

julia> @atomic :sequentially_consistent a.x = 2 # задает поле х объекта а с последовательной согласованностью
2

julia> @atomic a.x += 1 # пошагово увеличивает поле х объекта а с последовательной согласованностью
3

julia> @atomic a.x + 1 # пошагово увеличивает поле х объекта а с последовательной согласованностью
3 => 4

julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
4

julia> @atomic max(a.x, 10) # changes the field x of object a to the maximum value with consistent consistency
4 => 10

julia > @atomic a.x max 5 # changes the field x of object a to the maximum value again with consistent consistency
10 => 10
Compatibility: Julia 1.7

This feature requires a version of Julia at least 1.7.

@atomicswap a.b.x = new
@atomicswap :sequentially_consistent a.b.x = new

Saves new to a.b.x and returns the previous value of `a.b.x'.

This operation is converted into a call to `swapproperty!(a.b, :x, new)'.

For more information, see Atomic operations in each field guidelines.

Examples

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomicswap a.x = 2+2 # заменяет поле х объекта а на значение 4 с последовательной согласованностью
1

julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
4
Compatibility: Julia 1.7

This feature requires a version of Julia at least 1.7.

@atomicreplace a.b.x expected => desired
@atomicreplace :sequentially_consistent a.b.x expected => desired
@atomicreplace :sequentially_consistent :monotonic a.b.x expected => desired

Performs a conditional substitution atomically expressed by a pair, returning the values (old, success::Bool). Where success indicates whether the replacement was performed.

This operation is converted into a call to `replaceproperty!(a.b, :x, expected, desired)'.

For more information, see Atomic operations in each field guidelines.

Examples

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomicreplace a.x 1 => 2 # заменяет поле х объекта а на значение 2, если оно было равно 1, с последовательной согласованностью
(old = 1, success = true)

julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
2

julia> @atomicreplace a.x 1 => 2 # заменяет поле х объекта а на значение 2, если оно было равно 1, с последовательной согласованностью
(old = 2, success = false)

julia> xchg = 2 => 0; # заменяет поле х объекта а на значение 0, если оно было равно 2, с последовательной согласованностью

julia> @atomicreplace a.x xchg
(old = 2, success = true)

julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
0
Compatibility: Julia 1.7

This feature requires a version of Julia at least 1.7.

@atomiconce a.b.x = value
@atomiconce :sequentially_consistent a.b.x = value
@atomiconce :sequentially_consistent :monotonic a.b.x = value

Performs conditional assignment of a value atomically if it has not been previously set, returning the value success::Bool. success indicates whether the assignment has been completed.

This operation is converted into a call to `setpropertyonce!(a.b, :x, value)'.

For more information, see Atomic operations in each field guidelines.

Examples

julia> mutable struct AtomicOnce
           @atomic x
           AtomicOnce() = new()
       end

julia> a = AtomicOnce()
AtomicOnce(#undef)

julia> @atomiconce a.x = 1 # присваивает полю х объекта а значение 1, если оно не было задано, с последовательной согласованностью
true

julia> @atomic a.x # получает поле х объекта а с последовательной согласованностью
1

julia> @atomiconce a.x = 1 # присваивает полю х объекта а значение 1, если оно не было задано, с последовательной согласованностью
false
Compatibility: Julia 1.11

This feature requires a version of Julia not lower than 1.11.

AtomicMemory{T} == GenericMemory{:atomic, T, Core.CPU}

Vector DenseVector{T} of a fixed size. Access to any of its elements is carried out atomically (with the ordering :monotonic). Any element must be specified using the macro @atomic with explicit ordering.

Each element is independently atomic when accessed and cannot be specified non-atomically. Currently, the macro @atomic and the higher-level interface are not fully ready yet, but the building blocks for future implementation are the internal built-in functions Core.memoryrefget, Core.memoryrefset!, Core.memoryref_isassigned, Core.memoryrefswap!, Core.memoryrefmodify!`and `Core.memoryrefreplace!.

For more information, see Atomic operations.

Compatibility: Julia 1.11

This type requires a Julia version not lower than 1.11.

For an unsafe set of functions, there are also optional memory ordering options that select C/C compatible ones.++ versions of these atomic operations, if specified for this parameter unsafe_load, unsafe_store!, unsafe_swap!, unsafe_replace! and unsafe_modify!.

The following APIs have been deprecated, although they are likely to be supported in several more releases.

Threads.Atomic{T}

It contains a reference to an object of type T, allowing only atomic access, that is, in a thread-safe way.

Only certain "simple" types can be used atomically, in particular a primitive boolean type, an integer type, and a floating-point type. These include Bool, Int8'…​`Int128, UInt8…​UInt128 and Float16'…​`Float64.

New atomic objects can be created based on non-atomic values; if they are not specified, the atomic object is initialized with a null value.

Atomic objects can be accessed using the notation []:

Examples

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> x[] = 1
1

julia> x[]
1

Atomic operations use the prefix atomic_, such as atomic_add!, atomic_xchg! and so on.

Threads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T

Atomically performs a comparison operation with the exchange of `x'.

Atomically compares the value in x with cmp'. If they are equal, `newval is written to x'. Otherwise, the `x remains unchanged. Returns the previous value of x. By comparing the returned value with cmp (using ===), you can find out if x has changed and if it now contains the new value `newval'.

For more information, see the LLVM cmpxchg instructions.

This function can be used to implement transactional semantics. Before the transaction, the value is written to x. After the transaction is completed, the new value is saved only if x has not been changed during this period.

Examples

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 4, 2);

julia> x
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 3, 2);

julia> x
Base.Threads.Atomic{Int64}(2)
Threads.atomic_xchg!(x::Atomic{T}, newval::T) where T

Atomically replaces the value in x.

Atomically replaces the value in x with `newval'. Returns the previous value.

For more information, see the LLVM manual `atomicrmw xchg'.

Examples

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_xchg!(x, 2)
3

julia> x[]
2
Threads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

Atomically adds val to x

Executes x[] += val in an atomic way. Returns the previous value. Not defined for Atomic{Bool}.

For more information, see the LLVM instructions atomicrmw add.

Examples

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_add!(x, 2)
3

julia> x[]
5
Threads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

Atomically subtracts val from x

Executes x[] -= val in an atomic way. Returns the previous value. Not defined for Atomic{Bool}.

For more information, see the LLVM atomicrmw sub instructions.

Examples

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_sub!(x, 2)
3

julia> x[]
1
Threads.atomic_and!(x::Atomic{T}, val::T) where T

Atomically performs the bit operation "and" with x and val.

Executes x[] &= val in an atomic way. Returns the previous value.

For more information, see the LLVM atomicrmw and instructions.

Examples

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_and!(x, 2)
3

julia> x[]
2
Threads.atomic_nand!(x::Atomic{T}, val::T) where T

Atomically performs the bit operation "and not" with x and val

Executes `x[] = ~(x[] & val)`in an atomic way. Returns the previous value.

For more information, see the LLVM atomicrmw nand instructions.

Examples

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_nand!(x, 2)
3

julia> x[]
-3
Threads.atomic_or!(x::Atomic{T}, val::T) where T

Atomically performs the bitwise operation "or" with x and `val'.

Executes x[] |= val in an atomic way. Returns the previous value.

For more information, see the LLVM atomicrmw or instructions.

Examples

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_or!(x, 7)
5

julia> x[]
7
Threads.atomic_xor!(x::Atomic{T}, val::T) where T

Atomically performs the bitwise operation of excluding "or" with x and val

Executes x[] $= val in an atomic way. Returns the previous value.

For more information, see the LLVM atomicrmw xor instructions.

Examples

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_xor!(x, 7)
5

julia> x[]
2
Threads.atomic_max!(x::Atomic{T}, val::T) where T

Atomically stores the maximum of the values of x and val in x

Executes `x[] = max(x[], val)`in an atomic way. Returns the previous value.

For more information, see the LLVM atomicrmw max instructions.

Examples

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_max!(x, 7)
5

julia> x[]
7
Threads.atomic_min!(x::Atomic{T}, val::T) where T

Atomically stores the minimum of the values of x and val in x

Executes `x[] = min(x[], val)`in an atomic way. Returns the previous value.

For more information, see the LLVM atomicrmw min instructions.

Examples

julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)

julia> Threads.atomic_min!(x, 5)
7

julia> x[]
5
Threads.atomic_fence()

Inserts a sequentially consistent memory barrier.

Inserts a memory barrier with consistently consistent ordering semantics. If necessary, algorithms are available, that is, in cases where the ordering of receipt/release is insufficient.

This is most likely a very expensive operation. Given that all other atomic operations in Julia already have receive/release semantics, explicit barriers are not required in most cases.

For more information, see the LLVM fence instructions.

Calling ccall using libuv thread pool (experimental feature)

@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)

The macro @threadcall is called in the same way as 'ccall`, but performs its tasks in a different thread. It is recommended to use it if you want to call a blocking function from without blocking the current julia thread. Multithreaded mode is limited by the size of the libuv thread pool, which includes 4 threads by default, but this limit can be increased by setting the environment variable UV_THREADPOOL_SIZE and restarting the julia process.

Note that the called function should not make a callback to Julia.

Low-level synchronization primitives

These building blocks are used to create regular synchronization objects.

SpinLock()

Creates an unreadable active lock "check — check — install". Recursive usage leads to deadlock. This type of lock can only be used for code that does not take much time to execute and that is not blocked (for example, when performing I/O). As a rule, you can use instead ReentrantLock.

Each lock must be accompanied by unlock. If !islocked(lck::SpinLock) is being held, trylock(lck) succeeds unless there are other tasks trying to hold the lock at the same time.

Active check-check-install locks are fastest when there are approximately 30 competing threads. If there are more competing threads, you should choose other synchronization methods.