Tasks
#
Core.Task — Type
Task(func)
Creates a Task (i.e. a coroutine) to execute this func function (which must be called without arguments). The task terminates when this function returns. The task will be performed in the hierarchy of determining the "age of the world" methods from the parent object during construction during schedule.
|
By default, the pinning bit is set to true |
Examples
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
In this example, b represents a ready-to-run task that has not yet been started.
#
Base.@task — Macro
@task
Wraps the expression in Task, without executing it, and returns Task. This action only creates a task, not executes it.
|
By default, the pinning bit is set to true |
Examples
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
#
Base.@async — Macro
@async
Wraps the expression in Task and adds it to the scheduler queue of the local computer.
Values can be interpolated into @async using $, which copies the value directly into the constructed base closure. This allows you to insert the _value of a variable, isolating asynchronous code from changes in the value of the variable in the current task.
|
It is strongly recommended to always give preference to |
|
Compatibility: Julia 1.4
Interpolation of values using |
#
Base.asyncmap — Function
asyncmap(f, c...; ntasks=0, batch_size=nothing)
Uses multiple parallel tasks to map f to a collection (or multiple collections of the same length). For multiple arguments of the collection, f is applied element-by-element.
'ntasks` indicates the number of tasks to run in parallel. If ntasks is not specified, up to 100 tasks will be used for parallel matching, depending on the length of the collections.
'ntasks` can also be specified as a function without arguments. In this case, the number of tasks running in parallel is checked before processing each element, and a new task is started if the value of ntasks_func is greater than the current number of tasks.
If batch_size is specified, the collection is processed in batch mode. In this case, f' should be a function that accepts a vector of tuples of arguments and returns a vector of results. The length of the input vector must be equal to the length of `batch_size or less.
The following examples demonstrate how to perform various tasks by returning the objectid of the tasks in which the matching function is performed.
First, if the ntasks value is not set, each item is processed in a different task.
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
If the value is `ntasks=2', all items are processed in two tasks.
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
If the batch_size value is set, the matching function must be modified so that it accepts an array of tuples of arguments and returns an array of results. To do this, use map in the modified mapping function.
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
#
Base.istaskdone — Function
istaskdone(t::Task) -> Bool
Determines whether the task has completed its work.
Examples
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
#
Base.istaskstarted — Function
istaskstarted(t::Task) -> Bool
Determines whether the task has started execution.
Examples
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
#
Base.istaskfailed — Function
istaskfailed(t::Task) -> Bool
Determines whether the task has completed due to an exception.
Examples
julia> a4() = error("task failed");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
|
Compatibility: Julia 1.3
This feature requires a Julia version of 1.3 or higher. |
#
Base.task_local_storage — Method
task_local_storage(key)
Searches for the key value in the task’s local storage for the current task.
#
Base.task_local_storage — Method
task_local_storage(key, value)
Assigns a value to a key in the task’s local storage for the current task.
#
Base.task_local_storage — Method
task_local_storage(body, key, value)
Calls the 'body` function with the changed storage, local to the task, in which the value value is assigned to key; the previous key value or its absence is restored later. Useful for emulating dynamic scoping.
Planning
#
Base.yield — Function
yield()
Switches to the scheduler to allow the execution of another scheduled task. The task that calls this function is still ready to run and will be restarted immediately if there are no other tasks ready to run.
yield(t::Task, arg = nothing)
A fast version of schedule(t, arg); yield() with inaccurate scheduling, which immediately outputs t before calling the scheduler.
#
Base.yieldto — Function
yieldto(t::Task, arg = nothing)
Switches to a given task. When switching for the first time, the task function is called without arguments. On subsequent switches, the arg from the last task yieldto call is returned. This is a low-level call that only switches tasks, in no way considering states or scheduling. Its use is not recommended.
#
Base.sleep — Function
sleep(seconds)
Blocks the current task for the specified number of seconds. The minimum waiting time is 1 millisecond (enter the value 0.001).
#
Base.schedule — Function
schedule(t::Task, [val]; error=false)
Adds a task Task to the scheduler queue. This ensures that the task is constantly running when the system is inactive, only if the task is not performing a blocking operation, for example wait.
If the second argument val is specified, it will be passed to the task (via the return value yieldto') when restarting. If `error has the value `true', the value is called as an exception in the awakened task.
|
It is incorrect to use |
|
By default, the pinning bit is set to true |
Examples
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
Synchronization
#
Base.errormonitor — Function
errormonitor(t::Task)
Outputs the error log to stderr when task t fails.
Examples
julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
Unhandled Task ERROR: task failed
Stacktrace:
[...]
#
Base.@sync — Macro
@sync
Waits until all lexically determined uses @async, @spawn, Distributed.@spawnat and Distributed.@distributed will be executed. All exceptions thrown by nested asynchronous operations are collected and issued as CompositeException.
Examples
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
#
Base.wait — Function
Special note for Threads.Condition.
The caller must hold the lock (lock), which owns Threads.Condition, before calling this method. The calling task will be blocked until it is awakened by some other task, usually by calling notify for the same Threads' object.Condition. If a lock was performed, it will be atomically released (even if it was performed recursively) and will be re-acquired before control returns.
wait(r::Future)
Wait for a value to become available for the specified Future.
wait(r::RemoteChannel, args...)
Wait for a value to become available on the specified RemoteChannel.
wait([x])
Blocks the current task until an event occurs, depending on the type of argument.:
-
Channel: Wait for a value to be appended to the channel. -
Condition: Wait fornotifyon a condition and return theval' parameter passed to `notify'. Waiting for the condition additionally allows you to pass `first=true, which causes the waiting task to be placed first in the notification wake queue (`notify') instead of the usual FIFO behavior. -
Process: Waits for a process or chain of processes to complete. The `exitcode' field of a process can be used to determine whether it was successful or failed. -
Task: Wait for aTaskto finish. If the task fails with an exception, aTaskFailedExceptionis issued (which encapsulates the failed task). -
RawFD: Wait for changes on a file descriptor (see theFileWatchingpackage).
If no argument is passed, the task is blocked for an indefinite period of time. The task can be restarted only by explicitly calling schedule or yieldto.
'wait` is often called in the 'while` loop to ensure that an expected condition is met before continuing.
wait(c::Channel)
Blocks the task until the Channel' is ready (`isready).
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # задача блокируется, так как канал не готов
false
julia> put!(c, 1);
julia> istaskdone(task) # теперь задача разблокирована
true
#
Base.fetch — Method
fetch(t::Task)
Awaiting task completion Task, and then returns the value of its result. If the task fails with an exception, it is issued TaskFailedException (which encapsulates the failed task).
#
Base.timedwait — Function
timedwait(testcb, timeout::Real; pollint::Real=0.1)
Waits for testcb()`will return the value `true or when the timeout seconds have passed, depending on what happens earlier. The test function is polled every pollint seconds. The minimum value for pollint is 0.001 s, i.e. 1 millisecond.
Returns :ok or `:timed_out'.
Examples
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:ok
#
Base.Condition — Type
Condition()
Creates a front-end event source that tasks can expect. Tasks that cause wait for Condition', are suspended and placed in the queue. Tasks are awakened when `notify is later called for Condition'. Waiting for a condition may return a value or cause an error if optional arguments are used. `notify. Running at the front means that only tasks that are waiting during a call can be woken up. notify. For level-triggered notifications, an additional state must be maintained to track whether a notification has occurred. This is done using types Channel and Threads.Event, and they can be used for level-triggered events.
This object is not thread-safe. For information about the thread-safe version, see the description Threads.Condition.
#
Base.Threads.Condition — Type
Threads.Condition([lock])
Thread-safe version Base.Condition.
To call wait or notify for Threads.Condition, must first be called for it lock. When wait' is called, the lock is atomically released during the lock and will be reclaimed before `wait returns. Hence the idiomatic use of Threads.Condition c looks like this:
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
end
|
Compatibility: Julia 1.2
This feature requires a Julia version of at least 1.2. |
#
Base.Event — Type
Event([autoreset=false])
Creates a level-triggered event source. Tasks that cause wait for Event', are paused and queued until called `notify for Event'. After calling `notify, the Event remains in the alarm state, and tasks will no longer block, waiting for it while reset is called.
If autoreset' is set to true, a maximum of one task will be released from `wait for each notify call.
This makes it possible to organize the memory when it is received and released for notification or waiting.
|
Compatibility: Julia 1.1
This feature requires a Julia version of at least 1.1. |
|
Compatibility: Julia 1.8
The |
#
Base.notify — Function
notify(condition, val=nothing; all=true, error=false)
Wakes up the tasks that are waiting for the condition by passing them to val'. If `all is set to true (by default), all pending tasks are awakened, otherwise only one is. If error has the value `true', the passed value is called as an exception in the awakened tasks.
Returns the number of awakened tasks. Returns 0 if there are no tasks waiting for `condition'.
#
Base.Semaphore — Type
Semaphore(sem_size)
Creates a counting semaphore that allows you to use the maximum sem_size of receives at any time. Each receipt must correspond to an exemption.
This makes it possible to arrange the memory when receiving and releasing for receiving or releasing.
#
Base.acquire — Function
acquire(s::Semaphore)
Waits for one of the sem_size permissions to be available, locking it until it can be obtained.
acquire(f, s::Semaphore)
Executes f after receiving s from the semaphore and releasing (release) upon completion or error.
For example, the do-block form, which guarantees only two calls to foo, will be active at the same time.:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
|
Compatibility: Julia 1.8
This method requires a Julia version of at least 1.8. |
#
Base.release — Function
release(s::Semaphore)
Returns one permission to the pool, if possible allowing another task to receive it and resume execution.
#
Base.lock — Function
lock(lock)
Obtains a lock when it becomes available. If the lock has already been blocked by another task or another thread, it waits until it becomes available.
Each lock must correspond to an unlock (unlock).
lock(f::Function, lock)
Obtains the lock lock, executes f with the lock lock held, and releases the lock lock when control of f returns. If the lock has already been blocked by another task or another thread, it waits until it becomes available.
When this function returns, the lock is released, so the caller should not attempt to unlock it.
See also the description @lock.
|
Compatibility: Julia 1.7
To use |
lock(f::Function, l::Lockable)
Obtains the lock associated with l, executes f with the lock held, and releases the lock when control of f returns. 'f` will receive one positional argument: the value enclosed in l. If the lock has already been blocked by another task or another thread, it waits until it becomes available. When this function returns, the lock is released, so the caller should not attempt to unlock it.
|
Compatibility: Julia 1.11
Requires a version of Julia not lower than 1.11. |
#
Base.unlock — Function
unlock(lock)
Frees up ownership of the lock.
If it is a recursive lock that was obtained earlier, decrease the value of the internal counter and immediately return control.
#
Base.trylock — Function
trylock(lock) -> Success (Boolean)
Obtains the lock if it is available, and returns the value true on success. If the lock has already been blocked by another task or thread, returns `false'.
Each successful trylock lock must correspond to an unlock (unlock).
The 'trylock` function in combination with islocked can be used to write check-check-install algorithms or exponential delay algorithms if `typeof(lock)` is supported (check out the relevant documentation).
#
Base.islocked — Function
islocked(lock) -> Status (Boolean)
Checks whether the lock is being held by any task or thread. This function itself should not be used for synchronization. However, the islocked function in combination with 'trylock` can be used to write check-check-install algorithms or exponential delay algorithms if `typeof(lock)` is supported (check out the relevant documentation).
Advanced Help
For example, exponential delay can be implemented as follows if the implementation of `lock' satisfies the properties described below.
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
Implementation
To implement the lock, it is recommended to define islocked with the following properties and specify this in the corresponding docstring.
-
The
islocked(lock)function is free from "data race". -
If
islocked(lock)returnsfalse, an immediate call totrylock(lock)should be successful (returnstrue) if there is no interference from other tasks.
#
Base.ReentrantLock — Type
ReentrantLock()
Creates an incoming lock for task synchronization Task. The same task can get blocked as many times as necessary. Each lock must be accompanied by unlock.
Calling lock will also prevent the execution of termination methods for this thread before the corresponding unlock. Using the standard locking pattern shown below should naturally be supported, but beware of inverting the sequence of locking attempts or skipping the retry block entirely (for example, trying to make a return while the lock is still being held):
This makes it possible to organize the memory when receiving and releasing to block or unlock calls.
lock(l)
try
<atomic work>
finally
unlock(l)
end
If !islocked(lck::ReentrantLock) is being held, trylock(lck) succeeds unless there are other tasks trying to hold the lock at the same time.
#
Base.@lock — Macro
@lock l expr
The version of lock(f, l::AbstractLock) as a macro, but with the expression expr instead of the function f. Expands to the following:
lock(l)
try
expr
finally
unlock(l)
end
This is similar to using lock with the 'do` block, but avoids creating a closure and thus can improve performance.
|
Compatibility
The macro |
#
Base.Lockable — Type
Lockable(value, lock = ReentrantLock())
Creates a Lockable object that encapsulates the value' and binds it to the provided `lock'. This object supports @lock`, lock, trylock, unlock. To access the value, access the index of the object being locked while holding the lock.
|
Compatibility: Julia 1.11
Requires a version of Julia not lower than 1.11. |
Example
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # для доступа к значению должна удерживаться блокировка
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"
Channels
#
Base.AbstractChannel — Type
AbstractChannel{T}
Representation of a channel transmitting objects of type `T'.
#
Base.Channel — Type
Channel{T=Any}(size::Int=0)
Creates a channel with an internal buffer that can contain objects of type T with a maximum number up to size'. `put! calls the full channel block until the object is deleted using take!.
Channel(0) creates an unbuffered channel. put! performs a lock until the corresponding take!' is called.. And vice versa.
Other constructors:
-
Channel(): default constructor, equivalent toChannel{Any}(0) -
Channel(Inf): equivalent toChannel{Any}(typemax(Int)) -
Channel(sz): equivalent toChannel{Any}(sz)
|
Compatibility: Julia 1.3
The default constructor is |
#
Base.Channel — Method
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
Creates a new task based on func, binds it to a new channel of type T with the size size and schedules the task; all this is done in one call. The channel is automatically closed when the task is completed.
func should take the linked channel as the only argument.
If you need a link to the created task, pass the object Ref{Task} via the named argument `taskref'.
If spawn=true, the Task created for func can be scheduled for another thread in parallel, which is equivalent to creating a task via Threads.@spawn.
If spawn=true and the threadpool argument is not set, it defaults to :default.
If the threadpool argument is set (to :default or :interactive), it means that spawn=true and a new task is created in the specified thread pool.
Returns the `Channel'.
Examples
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
Creates a link to the created task
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
|
Compatibility: Julia 1.3
The |
|
Compatibility: Julia 1.9
The |
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
#
Base.put! — Method
put!(c::Channel, v)
Adds the v element to the c channel. Performs blocking if the channel is full.
For unbuffered channels, performs blocking until take! will not be completed by another task.
|
Compatibility: Julia 1.1
The |
#
Base.take! — Method
take!(c::Channel)
Deletes and returns a value from Channel in order. Performs a lock until the data becomes available. For unbuffered channels, performs blocking until put! will not be completed by another task.
Examples
Buffered channel:
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
Unbuffered channel:
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
#
Base.isready — Method
isready(c::Channel)
Determines whether it has Channel the value stored in it. Returns control immediately, the lock is not executed.
For unbuffered channels, returns the value true if there are tasks waiting. put!.
Examples
Buffered channel:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
Unbuffered channel:
julia> c = Channel();
julia> isready(c) # нет задач, ожидающих put!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # планирование задачи put!
julia> isready(c)
true
#
Base.fetch — Method
fetch(c::Channel)
Waits for and returns (without deleting) the first available element from the Channel. Note. 'fetch` is not supported in the unbuffered (zero-size) `Channel'.
Examples
Buffered channel:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # элемент не удаляется
3-element Vector{Any}:
1
2
3
#
Base.bind — Method
bind(chnl::Channel, task::Task)
Compares the lifetime of the chnl with the task. Channel The 'chnl` is automatically closed when the task is completed. Any undetected exception in a task is propagated to all pending objects for chnl.
The 'chnl` object can be explicitly closed regardless of the completion of the task. Completing tasks does not affect already closed Channel objects.
When a channel is linked to multiple tasks, the first task to be terminated will close the channel. When multiple channels are linked to the same task, terminating the task will close all related channels.
Examples
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
Low-level synchronization using schedule and wait
The easiest way is to use it correctly schedule in a Task' that has not yet been started (scheduled). However `schedule and wait can be used as a low-level standard block for building synchronization interfaces. The most important prerequisite for calling schedule(task) is that the caller must "own" the task. That is, she must know that the call to wait in this task occurs in places known to the code calling `schedule(task)'. One strategy for providing such a precondition is to use atomic operations, as shown in the following example:
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Выполнять до тех пор, пока состояние не будет обновлено до OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# Переход OWE_WAITING -> OWE_NOTIFYING означает, что задача ожидания
# уже ждет или собирается вызвать `wait`. Задача уведомления должна пробудить
# задачу ожидания.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Поскольку предполагается, что существует только одна задача уведомления (для простоты),
# мы знаем, что другим возможным случаем является OWE_EMPTY.
# Нам не нужно ничего делать, потому что мы знаем, что задача ожидания еще не
# вызвала `wait(ev::OneWayEvent)`.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# Переход OWE_EMPTY -> OWE_WAITING означает, что задача уведомления гарантированно
# вызовет переход OWE_WAITING -> OWE_NOTIFYING. Задача ожидания должна
# немедленно вызвать`wait()`. В частности, она не должна вызывать никаких функций, которые
# могут выдавать данные в планировщик в этой точке кода.
wait()
else
@assert state == OWE_NOTIFYING
# В противном случае `state` уже должно быть переведено в OWE_NOTIFYING
# задачей уведомления.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# вывод
notifying...
done
'OneWayEvent` allows one task to wait (wait') for a notification (`notify) from another task. This is a limited communication interface, since wait can only be used once from a single task (note the non-atomic assignment of `ev.task').
In this example, notify(ev::OneWayEvent) can call schedule(ev.task) if and only if on changes state from OWE_WAITING to OWE_NOTIFYING'. This way we know that the task executing `wait(ev::OneWayEvent) is now in the ok branch, and that there can be no other tasks trying to execute schedule(ev.task), since their macro is @atomicreplace(ev.state, state => OWE_NOTIFYING) will fail.