Engee documentation

Tasks

Task(func)

Creates a Task (i.e. a coroutine) to execute this func function (which must be called without arguments). The task terminates when this function returns. The task will be performed in the hierarchy of determining the "age of the world" methods from the parent object during construction during schedule.

By default, the pinning bit is set to true t.sticky for tasks. This mimics the traditional default behavior for @async. Pinned tasks can only be started in the workflow in which they were originally scheduled, and after scheduling, the task from which they were scheduled will also become pinned. To ensure the behavior Threads.@spawn, manually set the pinning bit to false.

Examples

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

In this example, b represents a ready-to-run task that has not yet been started.

@task

Wraps the expression in Task, without executing it, and returns Task. This action only creates a task, not executes it.

By default, the pinning bit is set to true t.sticky for tasks. This mimics the traditional default behavior for @async. Pinned tasks can only be started in the workflow in which they were originally scheduled, and after scheduling, the task from which they were scheduled will also become pinned. To ensure the behavior Threads.@spawn, manually set the pinning bit to false.

Examples

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
@async

Wraps the expression in Task and adds it to the scheduler queue of the local computer.

Values can be interpolated into @async using $, which copies the value directly into the constructed base closure. This allows you to insert the _value of a variable, isolating asynchronous code from changes in the value of the variable in the current task.

It is strongly recommended to always give preference to Threads'.@spawn instead of @async even when parallelism is not required, especially in public libraries. This is because using @async disables transferring parent tasks between work threads in the current Julia implementation. Thus, the seemingly innocent use of @async in a library function can have a big impact on the performance of various parts of user applications.

Compatibility: Julia 1.4

Interpolation of values using $ is available starting from Julia 1.4.

asyncmap(f, c...; ntasks=0, batch_size=nothing)

Uses multiple parallel tasks to map f to a collection (or multiple collections of the same length). For multiple arguments of the collection, f is applied element-by-element.

'ntasks` indicates the number of tasks to run in parallel. If ntasks is not specified, up to 100 tasks will be used for parallel matching, depending on the length of the collections.

'ntasks` can also be specified as a function without arguments. In this case, the number of tasks running in parallel is checked before processing each element, and a new task is started if the value of ntasks_func is greater than the current number of tasks.

If batch_size is specified, the collection is processed in batch mode. In this case, f' should be a function that accepts a vector of tuples of arguments and returns a vector of results. The length of the input vector must be equal to the length of `batch_size or less.

The following examples demonstrate how to perform various tasks by returning the objectid of the tasks in which the matching function is performed.

First, if the ntasks value is not set, each item is processed in a different task.

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

If the value is `ntasks=2', all items are processed in two tasks.

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

If the batch_size value is set, the matching function must be modified so that it accepts an array of tuples of arguments and returns an array of results. To do this, use map in the modified mapping function.

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

Similarly asyncmap', but the result is saved in `results, and the collection is returned.

If any modified argument is placed in the same memory area as any other argument, the behavior may be unexpected.

current_task()

Returns the currently running task Task.

istaskdone(t::Task) -> Bool

Determines whether the task has completed its work.

Examples

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
istaskstarted(t::Task) -> Bool

Determines whether the task has started execution.

Examples

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false
istaskfailed(t::Task) -> Bool

Determines whether the task has completed due to an exception.

Examples

julia> a4() = error("task failed");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
Compatibility: Julia 1.3

This feature requires a Julia version of 1.3 or higher.

task_local_storage(key)

Searches for the key value in the task’s local storage for the current task.

task_local_storage(key, value)

Assigns a value to a key in the task’s local storage for the current task.

task_local_storage(body, key, value)

Calls the 'body` function with the changed storage, local to the task, in which the value value is assigned to key; the previous key value or its absence is restored later. Useful for emulating dynamic scoping.

Planning

yield()

Switches to the scheduler to allow the execution of another scheduled task. The task that calls this function is still ready to run and will be restarted immediately if there are no other tasks ready to run.


yield(t::Task, arg = nothing)

A fast version of schedule(t, arg); yield() with inaccurate scheduling, which immediately outputs t before calling the scheduler.

yieldto(t::Task, arg = nothing)

Switches to a given task. When switching for the first time, the task function is called without arguments. On subsequent switches, the arg from the last task yieldto call is returned. This is a low-level call that only switches tasks, in no way considering states or scheduling. Its use is not recommended.

sleep(seconds)

Blocks the current task for the specified number of seconds. The minimum waiting time is 1 millisecond (enter the value 0.001).

schedule(t::Task, [val]; error=false)

Adds a task Task to the scheduler queue. This ensures that the task is constantly running when the system is inactive, only if the task is not performing a blocking operation, for example wait.

If the second argument val is specified, it will be passed to the task (via the return value yieldto') when restarting. If `error has the value `true', the value is called as an exception in the awakened task.

It is incorrect to use schedule for an arbitrary Task that has already been started. For more information, see API help.

By default, the pinning bit is set to true t.sticky for tasks. This mimics the traditional default behavior for @async. Pinned tasks can only be started in the workflow in which they were originally scheduled, and after scheduling, the task from which they were scheduled will also become pinned. To ensure the behavior Threads.@spawn, manually set the pinning bit to false.

Examples

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true

Synchronization

errormonitor(t::Task)

Outputs the error log to stderr when task t fails.

Examples

julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
Unhandled Task ERROR: task failed
Stacktrace:
[...]
@sync

Waits until all lexically determined uses @async, @spawn, Distributed.@spawnat and Distributed.@distributed will be executed. All exceptions thrown by nested asynchronous operations are collected and issued as CompositeException.

Examples

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2

Special note for Threads.Condition.

The caller must hold the lock (lock), which owns Threads.Condition, before calling this method. The calling task will be blocked until it is awakened by some other task, usually by calling notify for the same Threads' object.Condition. If a lock was performed, it will be atomically released (even if it was performed recursively) and will be re-acquired before control returns.


wait(r::Future)

Wait for a value to become available for the specified Future.


wait(r::RemoteChannel, args...)

Wait for a value to become available on the specified RemoteChannel.


wait([x])

Blocks the current task until an event occurs, depending on the type of argument.:

  • Channel: Wait for a value to be appended to the channel.

  • Condition: Wait for notify on a condition and return the val' parameter passed to `notify'. Waiting for the condition additionally allows you to pass `first=true, which causes the waiting task to be placed first in the notification wake queue (`notify') instead of the usual FIFO behavior.

  • Process: Waits for a process or chain of processes to complete. The `exitcode' field of a process can be used to determine whether it was successful or failed.

  • Task: Wait for a Task to finish. If the task fails with an exception, a TaskFailedException is issued (which encapsulates the failed task).

  • RawFD: Wait for changes on a file descriptor (see the FileWatching package).

If no argument is passed, the task is blocked for an indefinite period of time. The task can be restarted only by explicitly calling schedule or yieldto.

'wait` is often called in the 'while` loop to ensure that an expected condition is met before continuing.


wait(c::Channel)

Blocks the task until the Channel' is ready (`isready).

julia> c = Channel(1);

julia> isready(c)
false

julia> task = Task(() -> wait(c));

julia> schedule(task);

julia> istaskdone(task)  # задача блокируется, так как канал не готов
false

julia> put!(c, 1);

julia> istaskdone(task)  # теперь задача разблокирована
true
fetch(t::Task)

Awaiting task completion Task, and then returns the value of its result. If the task fails with an exception, it is issued TaskFailedException (which encapsulates the failed task).

fetch(x::Any)

Returns x.

timedwait(testcb, timeout::Real; pollint::Real=0.1)

Waits for testcb()`will return the value `true or when the timeout seconds have passed, depending on what happens earlier. The test function is polled every pollint seconds. The minimum value for pollint is 0.001 s, i.e. 1 millisecond.

Returns :ok or `:timed_out'.

Examples

julia> cb() = (sleep(5); return);

julia> t = @async cb();

julia> timedwait(()->istaskdone(t), 1)
:timed_out

julia> timedwait(()->istaskdone(t), 6.5)
:ok
Condition()

Creates a front-end event source that tasks can expect. Tasks that cause wait for Condition', are suspended and placed in the queue. Tasks are awakened when `notify is later called for Condition'. Waiting for a condition may return a value or cause an error if optional arguments are used. `notify. Running at the front means that only tasks that are waiting during a call can be woken up. notify. For level-triggered notifications, an additional state must be maintained to track whether a notification has occurred. This is done using types Channel and Threads.Event, and they can be used for level-triggered events.

This object is not thread-safe. For information about the thread-safe version, see the description Threads.Condition.

Threads.Condition([lock])

Thread-safe version Base.Condition.

To call wait or notify for Threads.Condition, must first be called for it lock. When wait' is called, the lock is atomically released during the lock and will be reclaimed before `wait returns. Hence the idiomatic use of Threads.Condition c looks like this:

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
Compatibility: Julia 1.2

This feature requires a Julia version of at least 1.2.

Event([autoreset=false])

Creates a level-triggered event source. Tasks that cause wait for Event', are paused and queued until called `notify for Event'. After calling `notify, the Event remains in the alarm state, and tasks will no longer block, waiting for it while reset is called.

If autoreset' is set to true, a maximum of one task will be released from `wait for each notify call.

This makes it possible to organize the memory when it is received and released for notification or waiting.

Compatibility: Julia 1.1

This feature requires a Julia version of at least 1.1.

Compatibility: Julia 1.8

The autoreset function and the memory ordering guarantee require a Julia version not lower than 1.8.

notify(condition, val=nothing; all=true, error=false)

Wakes up the tasks that are waiting for the condition by passing them to val'. If `all is set to true (by default), all pending tasks are awakened, otherwise only one is. If error has the value `true', the passed value is called as an exception in the awakened tasks.

Returns the number of awakened tasks. Returns 0 if there are no tasks waiting for `condition'.

reset(::Event)

Resets the event Event back to an unknown state. Then all subsequent wait calls will be blocked until the call is repeated. notify.

Semaphore(sem_size)

Creates a counting semaphore that allows you to use the maximum sem_size of receives at any time. Each receipt must correspond to an exemption.

This makes it possible to arrange the memory when receiving and releasing for receiving or releasing.

acquire(s::Semaphore)

Waits for one of the sem_size permissions to be available, locking it until it can be obtained.


acquire(f, s::Semaphore)

Executes f after receiving s from the semaphore and releasing (release) upon completion or error.

For example, the do-block form, which guarantees only two calls to foo, will be active at the same time.:

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
Compatibility: Julia 1.8

This method requires a Julia version of at least 1.8.

release(s::Semaphore)

Returns one permission to the pool, if possible allowing another task to receive it and resume execution.

AbstractLock

An abstract supertype that describes types that implement synchronization primitives: lock, trylock, unlock and islocked.

lock(lock)

Obtains a lock when it becomes available. If the lock has already been blocked by another task or another thread, it waits until it becomes available.

Each lock must correspond to an unlock (unlock).


lock(f::Function, lock)

Obtains the lock lock, executes f with the lock lock held, and releases the lock lock when control of f returns. If the lock has already been blocked by another task or another thread, it waits until it becomes available.

When this function returns, the lock is released, so the caller should not attempt to unlock it.

See also the description @lock.

Compatibility: Julia 1.7

To use Channel requires a version of Julia at least 1.7 as the second argument.


lock(f::Function, l::Lockable)

Obtains the lock associated with l, executes f with the lock held, and releases the lock when control of f returns. 'f` will receive one positional argument: the value enclosed in l. If the lock has already been blocked by another task or another thread, it waits until it becomes available. When this function returns, the lock is released, so the caller should not attempt to unlock it.

Compatibility: Julia 1.11

Requires a version of Julia not lower than 1.11.

unlock(lock)

Frees up ownership of the lock.

If it is a recursive lock that was obtained earlier, decrease the value of the internal counter and immediately return control.

trylock(lock) -> Success (Boolean)

Obtains the lock if it is available, and returns the value true on success. If the lock has already been blocked by another task or thread, returns `false'.

Each successful trylock lock must correspond to an unlock (unlock).

The 'trylock` function in combination with islocked can be used to write check-check-install algorithms or exponential delay algorithms if `typeof(lock)` is supported (check out the relevant documentation).

islocked(lock) -> Status (Boolean)

Checks whether the lock is being held by any task or thread. This function itself should not be used for synchronization. However, the islocked function in combination with 'trylock` can be used to write check-check-install algorithms or exponential delay algorithms if `typeof(lock)` is supported (check out the relevant documentation).

Advanced Help

For example, exponential delay can be implemented as follows if the implementation of `lock' satisfies the properties described below.

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("timeout")
    end
    trylock(lock) && break
    backoff()
end

Implementation

To implement the lock, it is recommended to define islocked with the following properties and specify this in the corresponding docstring.

  • The islocked(lock) function is free from "data race".

  • If islocked(lock) returns false, an immediate call to trylock(lock) should be successful (returns true) if there is no interference from other tasks.

ReentrantLock()

Creates an incoming lock for task synchronization Task. The same task can get blocked as many times as necessary. Each lock must be accompanied by unlock.

Calling lock will also prevent the execution of termination methods for this thread before the corresponding unlock. Using the standard locking pattern shown below should naturally be supported, but beware of inverting the sequence of locking attempts or skipping the retry block entirely (for example, trying to make a return while the lock is still being held):

This makes it possible to organize the memory when receiving and releasing to block or unlock calls.

lock(l)
try
    <atomic work>
finally
    unlock(l)
end

If !islocked(lck::ReentrantLock) is being held, trylock(lck) succeeds unless there are other tasks trying to hold the lock at the same time.

@lock l expr

The version of lock(f, l::AbstractLock) as a macro, but with the expression expr instead of the function f. Expands to the following:

lock(l)
try
    expr
finally
    unlock(l)
end

This is similar to using lock with the 'do` block, but avoids creating a closure and thus can improve performance.

Compatibility

The macro @lock was added in Julia 1.3 and exported in Julia 1.10.

Lockable(value, lock = ReentrantLock())

Creates a Lockable object that encapsulates the value' and binds it to the provided `lock'. This object supports @lock`, lock, trylock, unlock. To access the value, access the index of the object being locked while holding the lock.

Compatibility: Julia 1.11

Requires a version of Julia not lower than 1.11.

Example

julia> locked_list = Base.Lockable(Int[]);

julia> @lock(locked_list, push!(locked_list[], 1)) # для доступа к значению должна удерживаться блокировка
1-element Vector{Int64}:
 1

julia> lock(summary, locked_list)
"1-element Vector{Int64}"

Channels

AbstractChannel{T}

Representation of a channel transmitting objects of type `T'.

Channel{T=Any}(size::Int=0)

Creates a channel with an internal buffer that can contain objects of type T with a maximum number up to size'. `put! calls the full channel block until the object is deleted using take!.

Channel(0) creates an unbuffered channel. put! performs a lock until the corresponding take!' is called.. And vice versa.

Other constructors:

  • Channel(): default constructor, equivalent to Channel{Any}(0)

  • Channel(Inf): equivalent to Channel{Any}(typemax(Int))

  • Channel(sz): equivalent to Channel{Any}(sz)

Compatibility: Julia 1.3

The default constructor is Channel() and size=0 were added by default in Julia 1.3.

Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

Creates a new task based on func, binds it to a new channel of type T with the size size and schedules the task; all this is done in one call. The channel is automatically closed when the task is completed.

func should take the linked channel as the only argument.

If you need a link to the created task, pass the object Ref{Task} via the named argument `taskref'.

If spawn=true, the Task created for func can be scheduled for another thread in parallel, which is equivalent to creating a task via Threads.@spawn.

If spawn=true and the threadpool argument is not set, it defaults to :default.

If the threadpool argument is set (to :default or :interactive), it means that spawn=true and a new task is created in the specified thread pool.

Returns the `Channel'.

Examples

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

Creates a link to the created task

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
Compatibility: Julia 1.3

The spawn= parameter was added in Julia 1.3. This constructor was added in Julia 1.3. In earlier versions, the Julia channel used keyword arguments to set size and T, but these constructors are deprecated.

Compatibility: Julia 1.9

The threadpool= argument was added in Julia 1.9.

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(1) (2 items available)

julia> String(collect(chnl))
"hello world"
put!(c::Channel, v)

Adds the v element to the c channel. Performs blocking if the channel is full.

For unbuffered channels, performs blocking until take! will not be completed by another task.

Compatibility: Julia 1.1

The v is now converted to a channel type using convert, because put! is called.

take!(c::Channel)

Deletes and returns a value from Channel in order. Performs a lock until the data becomes available. For unbuffered channels, performs blocking until put! will not be completed by another task.

Examples

Buffered channel:

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

Unbuffered channel:

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1
isready(c::Channel)

Determines whether it has Channel the value stored in it. Returns control immediately, the lock is not executed.

For unbuffered channels, returns the value true if there are tasks waiting. put!.

Examples

Buffered channel:

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

Unbuffered channel:

julia> c = Channel();

julia> isready(c)  # нет задач, ожидающих put!
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # планирование задачи put!

julia> isready(c)
true
fetch(c::Channel)

Waits for and returns (without deleting) the first available element from the Channel. Note. 'fetch` is not supported in the unbuffered (zero-size) `Channel'.

Examples

Buffered channel:

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # элемент не удаляется
3-element Vector{Any}:
 1
 2
 3
close(c::Channel[, excp::Exception])

Closes the channel. An exception (which can also be given by excp) is thrown:

  • put! on a closed channel.

  • take! and fetch on an empty, closed channel.

bind(chnl::Channel, task::Task)

Compares the lifetime of the chnl with the task. Channel The 'chnl` is automatically closed when the task is completed. Any undetected exception in a task is propagated to all pending objects for chnl.

The 'chnl` object can be explicitly closed regardless of the completion of the task. Completing tasks does not affect already closed Channel objects.

When a channel is linked to multiple tasks, the first task to be terminated will close the channel. When multiple channels are linked to the same task, terminating the task will close all related channels.

Examples

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]

Low-level synchronization using schedule and wait

The easiest way is to use it correctly schedule in a Task' that has not yet been started (scheduled). However `schedule and wait can be used as a low-level standard block for building synchronization interfaces. The most important prerequisite for calling schedule(task) is that the caller must "own" the task. That is, she must know that the call to wait in this task occurs in places known to the code calling `schedule(task)'. One strategy for providing such a precondition is to use atomic operations, as shown in the following example:

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Выполнять до тех пор, пока состояние не будет обновлено до OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # Переход OWE_WAITING -> OWE_NOTIFYING означает, что задача ожидания
                # уже ждет или собирается вызвать `wait`. Задача уведомления должна пробудить
                # задачу ожидания.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Поскольку предполагается, что существует только одна задача уведомления (для простоты),
                # мы знаем, что другим возможным случаем является OWE_EMPTY.
                # Нам не нужно ничего делать, потому что мы знаем, что задача ожидания еще не
                # вызвала `wait(ev::OneWayEvent)`.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # Переход OWE_EMPTY -> OWE_WAITING означает, что задача уведомления гарантированно
        # вызовет переход OWE_WAITING -> OWE_NOTIFYING.  Задача ожидания должна
        # немедленно вызвать`wait()`.  В частности, она не должна вызывать никаких функций, которые
        # могут выдавать данные в планировщик в этой точке кода.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # В противном случае `state` уже должно быть переведено в OWE_NOTIFYING
        # задачей уведомления.
    end
    return
end

ev = OneWayEvent()
@sync begin
    @async begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# вывод
notifying...
done

'OneWayEvent` allows one task to wait (wait') for a notification (`notify) from another task. This is a limited communication interface, since wait can only be used once from a single task (note the non-atomic assignment of `ev.task').

In this example, notify(ev::OneWayEvent) can call schedule(ev.task) if and only if on changes state from OWE_WAITING to OWE_NOTIFYING'. This way we know that the task executing `wait(ev::OneWayEvent) is now in the ok branch, and that there can be no other tasks trying to execute schedule(ev.task), since their macro is @atomicreplace(ev.state, state => OWE_NOTIFYING) will fail.