Документация Engee

Задачи

# Core.TaskType

Task(func)

Создает Task (т. е. сопрограмму) для выполнения данной функции func (которая должна вызываться без аргументов). Задача завершает работу, когда возвращается эта функция. Задача будет выполняться в иерархии определения методов «возраст мира» от родительского объекта при конструировании во время schedule.

Примеры

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

В этом примере b представляет собой готовую к выполнению задачу (Task), которая еще не была запущена.

# Base.@taskMacro

@task

Заключает в оболочку выражение в Task, не выполняя его, и возвращает Task. Это действие только создает задачу, а не выполняет ее.

Примеры

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true

# Base.@asyncMacro

@async

Заключает в оболочку выражение в Task и добавляет его в очередь планировщика локального компьютера.

Значения можно интерполировать в @async с помощью $, который копирует значение непосредственно в сконструированное базовое закрытие. Это позволяет вставлять значение переменной, изолируя асинхронный код от изменений значения переменной в текущей задаче.

Настоятельно рекомендуется всегда вместо @async использовать Threads.@spawn, даже если параллелизм не требуется, особенно в открыто распространяемых библиотеках. Причина в том, что в текущей реализации Julia использование @async отключает миграцию родительской задачи между рабочими потоками. Поэтому безобидное на первый взгляд использование @async в библиотечной функции может серьезно сказаться на производительности совершенно других частей пользовательского приложения.

Совместимость: Julia 1.4

Интерполяция значений с помощью $ доступна начиная с Julia версии 1.4.

# Base.asyncmapFunction

asyncmap(f, c...; ntasks=0, batch_size=nothing)

Использует несколько параллельных задач для сопоставления f с коллекцией (или несколькими коллекциями одинаковой длины). Для нескольких аргументов коллекции f применяется поэлементно.

ntasks указывает количество задач для параллельного выполнения. Зависит от длины коллекций, если ntasks не указано, для параллельного сопоставления будет использоваться до 100 задач.

ntasks также можно указать в виде функции без аргументов. В этом случае количество запускаемых параллельно задач проверяется до обработки каждого элемента, и запускается новая задача, если значение ntasks_func больше текущего количества задач.

Если указан batch_size, коллекция обрабатывается в пакетном режиме. В этом случае f должна быть функцией, принимающей вектор (Vector) кортежей аргументов и возвращающей вектор результатов. Длина вектора ввода должна равняться длине batch_size или быть меньше.

В следующих примерах демонстрируется выполнение различных задач путем возврата objectid задач, в которых выполняется функция сопоставления.

Сначала, если значение ntasks не задано, каждый элемент обрабатывается в другой задаче.

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

Если значение ntasks=2, все элементы обрабатываются в двух задачах.

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

Если значение batch_size задано, необходимо изменить функцию сопоставления, чтобы она принимала массив кортежей аргументов и возвращала массив результатов. Для этого используется map в измененной функции сопоставления.

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"

В настоящее время все задачи в языке Julia выполняются совместно в одном потоке ОС. Следовательно, asyncmap является выгодным, только когда функция сопоставления использует любые операции ввода-вывода — дисковые, сетевые, удаленные вызовы рабочих ролей и т. д.

# Base.asyncmap!Function

asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

Аналогичен asyncmap, но результат сохраняется в results, а не при возврате коллекции.

# Base.current_taskFunction

current_task()

Получает выполняющуюся в настоящий момент задачу (Task).

# Base.istaskdoneFunction

istaskdone(t::Task) -> Bool

Определяет, завершила ли задача работу.

Примеры

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true

# Base.istaskstartedFunction

istaskstarted(t::Task) -> Bool

Определяет, начала ли задача выполнение.

Примеры

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false

# Base.istaskfailedFunction

istaskfailed(t::Task) -> Bool

Определяет, завершила ли задача работу из-за выданного исключения.

Примеры

julia> a4() = error("task failed");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
Совместимость: Julia 1.3

Для этой функции требуется версия не ниже Julia 1.3.

# Base.task_local_storageMethod

task_local_storage(key)

Выполняет поиск значения ключа в локальном для задачи хранилище текущей задачи.

# Base.task_local_storageMethod

task_local_storage(key, value)

Присваивает значение ключу в локальном для задачи хранилище текущей задачи.

# Base.task_local_storageMethod

task_local_storage(body, key, value)

Вызывает функцию body с измененным хранилищем, локальным для задачи, в котором значение value присваивается key; предыдущее значение key или его отсутствие восстанавливается позднее. Полезно для эмуляции динамического определения областей.

Планирование

# Base.yieldFunction

yield()

Переключается на планировщик, чтобы разрешить выполнение другой запланированной задачи. Задача, которая вызывает эту функцию, по-прежнему готова к выполнению и будет немедленно перезапущена, если отсутствуют другие готовые к выполнению задачи.

yield(t::Task, arg = nothing)

Быстрая версия schedule(t, arg); yield() с неточным планированием, которая немедленно выдает t до вызова планировщика.

# Base.yieldtoFunction

yieldto(t::Task, arg = nothing)

Переключение на данную задачу. При первом переключении функция задачи вызывается без аргументов. При последующих переключениях возвращается arg из последнего вызова задачи yieldto. Это низкоуровневый вызов, который только переключает задачи, никоим образом не учитывая состояния или планирование. Его использование не рекомендуется.

# Base.sleepFunction

sleep(seconds)

Блокирует текущую задачу на указанное количество секунд. Минимальное время ожидания — 1 миллисекунда (следует ввести значение 0.001).

# Base.scheduleFunction

schedule(t::Task, [val]; error=false)

Добавляет задачу (Task) в очередь планировщика. Это обеспечивает постоянное выполнение задачи, когда система неактивна, только если задача не выполняет операцию блокирования, например wait.

Если указан второй аргумент val, он будет передан задаче (через возвращаемое значение yieldto) при повторном запуске. Если error имеет значение true, значение вызывается как исключение в пробужденной задаче.

Неправильно использовать schedule для произвольной задачи (Task), которая уже была запущена. Дополнительные сведения см. в справке по API.

Примеры

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true

Синхронизация

# Base.errormonitorFunction

errormonitor(t::Task)

Выводит журнал ошибок в stderr при сбое задачи t.

Примеры

julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
Unhandled Task ERROR: task failed
Stacktrace:
[...]

# Base.@syncMacro

@sync

Ожидает, пока все лексически обусловленные использования @async, @spawn, @spawnat и @distributed будут выполнены. Все исключения, выдаваемые вложенными асинхронными операциями, собираются и выдаются как CompositeException.

Примеры

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2

# Base.waitMethod

wait([x])

Блокирует текущую задачу до тех пор, пока не произойдет какое-либо событие, в зависимости от типа аргумента:

  • Channel: ожидает значение, которое следует добавить в канал.

  • Condition: ожидает уведомление (notify) для условия и возвращает параметр val, переданный в notify. Ожидание для условия дополнительно позволяет передать first=true, что приводит к тому, что задача ожидания помещается первой в очередь на пробуждение при уведомлении (notify) вместо обычного поведения FIFO.

  • Process: ожидает завершения работы процесса или цепочки процессов. Поле exitcode процесса можно использовать для определения успешного выполнения или сбоя.

  • Task: ожидает завершения задачи (Task). Если задача завершается сбоем с исключением, выдается TaskFailedException (которое заключает в оболочку завершившуюся сбоем задачу).

  • RawFD: ожидает изменений дескриптора файла (см. пакет FileWatching).

Если аргумент не передается, задача блокируется на неопределенный период времени. Задачу можно перезапустить, только явно вызвав schedule или yieldto.

wait часто вызывается в цикле while, чтобы гарантировать выполнение ожидаемого условия, прежде чем продолжить.

# Base.waitMethod

wait(r::RemoteChannel, args...)

Ожидает, когда станет доступным значение для указанного RemoteChannel.

# Base.waitMethod

wait(r::Future)

Ожидает, когда станет доступным значение для указанного Future.

# Base.fetchMethod

fetch(t::Task)

Ожидает завершения задачи, а затем возвращает значение ее результата. Если задача завершается сбоем с исключением, выдается TaskFailedException (которое заключает в оболочку завершившуюся сбоем задачу) .

# Base.fetchMethod

fetch(x::Any)

Возвращает x.

# Base.timedwaitFunction

timedwait(testcb, timeout::Real; pollint::Real=0.1)

Ожидает, когда testcb() вернет значение true или пройдет timeout секунд, в зависимости от того, что произойдет ранее. Тестовая функция опрашивается каждые pollint секунд. Минимальное значение для pollint — 0,001 с, т. е. 1 миллисекунда.

Возвращает :ok или :timed_out.

# Base.ConditionType

Condition()

Создает источник событий с запуском по фронту, который могут ожидать задачи. Задачи, которые вызывают wait для Condition, приостанавливаются и помещаются в очередь. Задачи пробуждаются, когда notify позднее вызывается для Condition. Запуск по фронту означает, что можно пробудить только задачи, ожидающие во время вызова notify . Для уведомлений с запуском по уровню необходимо поддерживать дополнительное состояние, чтобы отслеживать, имело ли место уведомление. Это делается с помощью типов Channel и Threads.Event, и их можно использовать для событий с запуском по уровню.

Этот объект не является потокобезопасным. Сведения о потокобезопасной версии см. в разделе Threads.Condition.

# Base.Threads.ConditionType

Threads.Condition([lock])

Потокобезопасная версия Base.Condition.

Чтобы вызвать wait или notify для Threads.Condition, необходимо сначала вызвать для него lock. При вызове wait блокировка атомарно освобождается во время блокировки и будет повторно получена до возврата wait. Следовательно, идиоматичное использование Threads.Condition c выглядит следующим образом.

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
Совместимость: Julia 1.2

Для этой функции требуется версия не ниже Julia 1.2.

# Base.EventType

Event([autoreset=false])

Создает источник событий с запуском по уровню. Задачи, которые вызывают wait для Event, приостанавливаются и помещаются в очередь, пока не будет вызвано notify для Event. После вызова notify Event остается в сигнальном состоянии, а задачи более не будут блокироваться, ожидая его, пока вызывается reset.

Если autoreset имеет значение true, максимум одна задача будет освобождена из wait для каждого вызова notify.

Это дает возможность упорядочения памяти при получении и освобождении для уведомления или ожидания.

Совместимость: Julia 1.1

Для этой функции требуется версия не ниже Julia 1.1.

Совместимость: Julia 1.8

Для функции autoreset и гарантии упорядочения памяти требуется версия не ниже Julia 1.8.

# Base.notifyFunction

notify(condition, val=nothing; all=true, error=false)

Пробуждает задачи, которые ожидают условие, передавая их val. Если all имеет значение true (по умолчанию), пробуждаются все ожидающие задачи, а в противном случае — только одна. Если error имеет значение true, передаваемое значение вызывается как исключение в пробужденных задачах.

Возвращает количество пробужденных задач. Возвращает 0, если нет задач, ожидающих condition.

# Base.resetMethod

reset(::Event)

Сбрасывает событие обратно в незаданное состояние. Затем все последующие вызовы wait будут блокироваться до повторного вызова notify.

# Base.SemaphoreType

Semaphore(sem_size)

Создает семафор подсчета, разрешающий использовать максимум получений (sem_size) в любое время. Каждое получение должно соответствовать освобождению.

Это дает возможность упорядочения памяти при получении и освобождении для получения или освобождения.

# Base.acquireFunction

acquire(s::Semaphore)

Ожидает доступности одного из разрешений sem_size, выполняя блокировку, пока не удастся его получить.

acquire(f, s::Semaphore)

Выполняет f после получения из семафора s и освобождения (release) при завершении или ошибке.

Например, форма do-block, гарантирующая только два вызова foo, будет активна одновременно:

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
Совместимость: Julia 1.8

Для этого метода требуется версия Julia не ниже 1.8.

# Base.releaseFunction

release(s::Semaphore)

Возвращает одно разрешение в пул, по возможности разрешая другой задаче получить его и возобновить выполнение.

# Base.AbstractLockType

AbstractLock

Абстрактный супертип, описывающий типы, реализующие примитивы синхронизации: lock, trylock, unlock и islocked.

# Base.lockFunction

lock(lock)

Получает блокировку (lock), когда она становится доступной. Если блокировка уже заблокирована другой задачей или потоком, подождите, когда она станет доступной.

Каждая блокировка (lock) должна соответствовать разблокировке (unlock).

lock(f::Function, lock)

Получает блокировку lock, выполняет f с удерживаемой блокировкой lock и освобождает блокировку lock при возврате f . Если блокировка уже заблокирована другой задачей или другим потоком, ожидает, пока она не станет доступной.

Когда эта функция возвращается, блокировка (lock) освобождается, поэтому вызывающий объект не должен пытаться разблокировать (unlock) ее.

Совместимость: Julia 1.7

Для использования Channel в качестве второго аргумента требуется Julia версии 1.7 или более поздней версии.

# Base.unlockFunction

unlock(lock)

Освобождает владение блокировкой (lock).

Если это рекурсивная блокировка, которая была получена ранее, уменьшает значение внутреннего счетчика и немедленно возвращается.

# Base.trylockFunction

trylock(lock) -> Success (Boolean)

Получает блокировку, если она доступна, и возвращает значение true при успехе. Если блокировка уже заблокирована другой задачей или потоком, возвращается false.

Каждая блокировка (trylock) должна соответствовать разблокировке (unlock).

Функцию trylock в сочетании с islocked можно использовать для написания алгоритмов «проверить-проверить-установить» или алгоритмов экспоненциальной задержки (* если поддерживается) typeof(lock)* (ознакомьтесь с соответствующей документацией).

# Base.islockedFunction

islocked(lock) -> Status (Boolean)

Проверяет, удерживается ли блокировка (lock) какой-либо задачей или каким-либо потоком. Эта функция сама по себе не должна использоваться для синхронизации. Однако islocked в сочетании с trylock можно использовать для написания алгоритмов «проверить-проверить-установить» или алгоритмов экспоненциальной задержки если поддерживается `typeof(lock)` (ознакомьтесь с соответствующей документацией).

Расширенная справка

Например, экспоненциальная задержка может быть реализована следующим образом, если реализация lock удовлетворяет свойствам, описанным ниже.

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("timeout")
    end
    trylock(lock) && break
    backoff()
end

Реализация

Реализации блокировки рекомендуется определить islocked со следующими свойствами и указать это в соответствующей документации.

  • Функция islocked(lock) свободна от "гонки по данным".

  • Если islocked(lock) возвращает false, немедленный вызов trylock(lock) должен быть успешным (возвращает true), если нет помех со стороны других задач.

# Base.ReentrantLockType

ReentrantLock()

Создает входящую блокировку для синхронизации задач (Task). Одна и та же задача может получить блокировку столько раз, сколько необходимо. Каждая блокировка (lock) должна соответствовать разблокировке (unlock).

Вызов «блокировки» также воспрепятствует выполнению методов завершения для этого потока до соответствующей «разблокировки». Использование стандартного шаблона блокировки, показанного ниже, должно естественным образом поддерживаться, но остерегайтесь инвертировать последовательность попыток блокировки или полностью пропускать блок повторных попыток (например, пытаться выполнить возврат при все еще удерживаемой блокировке):

Это дает возможность упорядочения памяти при получении и освобождении для блокировки или разблокировки вызовов.

lock(l)
try
    <atomic work>
finally
    unlock(l)
end

Если !islocked(lck::ReentrantLock) удерживается, trylock(lck) выполняется успешно, если только нет других задач, пытающихся удержать блокировку в то же самое время.

Каналы

# Base.AbstractChannelType

AbstractChannel{T}

Представление канала, передающего объекты типа T.

# Base.ChannelType

Channel{T=Any}(size::Int=0)

Конструирует канал (Channel) с внутренним буфером, который может содержать объекты типа T максимальным числом до size. put! вызывает полный блок канала, пока объект не будет удален с помощью take!.

Channel(0) конструирует небуферизованный канал. put! выполняет блокировку до вызова соответствующего take!. И наоборот.

Другие конструкторы:

  • Channel(): конструктор по умолчанию, эквивалентен Channel{Any}(0)

  • Channel(Inf): эквивалентен Channel{Any}(typemax(Int))

  • Channel(sz): эквивалентен Channel{Any}(sz)

Совместимость: Julia 1.3

Конструктор по умолчанию Channel() и size=0 по умолчанию были добавлены в Julia 1.3.

# Base.ChannelMethod

Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false)

Создает новую задачу на основе func, привязывает ее к новому каналу типа T размером size и планирует задачу; все это выполняется за один вызов. Канал автоматически закрывается при завершении работы задачи.

func должна принимать привязанный канал в качестве единственного аргумента.

Если вам нужна ссылка на созданную задачу, передайте объект Ref{Task} через аргумент ключевого слова taskref.

Если spawn = true, задачу, созданную для func, можно запланировать для другого потока параллельно, что эквивалентно созданию задачи через Threads.@spawn.

Возвращает Channel.

Примеры

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

Создает ссылку на созданную задачу

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
Совместимость: Julia 1.3

Параметр spawn= был добавлен в Julia 1.3. Этот конструктор был добавлен в Julia 1.3. В более ранних версиях канал Julia использовал аргументы ключевого слова, чтобы задать size и T, но эти конструкторы являются устаревшими.

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(1) (2 items available)

julia> String(collect(chnl))
"hello world"

# Base.put!Method

put!(c::Channel, v)

Добавляет элемент v к каналу c. Выполняет блокировку, если канал полон.

Для небуферизованных каналов выполняет блокировку, пока take! не будет выполнено другой задачей.

Совместимость: Julia 1.1

v теперь преобразуется в тип канала с помощью convert, так как вызывается put!.

# Base.take!Method

take!(c::Channel)

Удаляет и возвращает значение из Channel по порядку. Выполняет блокировку, пока данные не станут доступны. Для небуферизованных каналов выполняет блокировку, пока put! не будет выполнено другой задачей.

Примеры

Буферизованный канал:

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

Небуферизованный канал:

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1

# Base.isreadyMethod

isready(c::Channel)

Определяет, имеет ли Channel значение, хранимое в нем. Возвращает немедленно, блокировка не выполняется.

Для небуферизованных каналов возвращает значение true, если имеются задачи, ожидающие put!.

Примеры

Буферизованный канал:

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

Небуферизованный канал:

julia> c = Channel();

julia> isready(c)  # нет задач, ожидающих put!
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # планирование задачи put!

julia> isready(c)
true

# Base.fetchMethod

fetch(c::Channel)

Ожидает и возвращает (без удаления) первый доступный элемент из Channel. Примечание. fetch не поддерживается в небуферизованном канале (с нулевым размером) Channel.

Примеры

Буферизованный канал:

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # элемент не удаляется
3-element Vector{Any}:
 1
 2
 3

# Base.closeMethod

close(c::Channel[, excp::Exception])

Закрывает канал. Исключение (которое также может выдаваться excp), выдается:

  • put! для закрытого канала.

  • take! и fetch для пустого закрытого канала.

# Base.bindMethod

bind(chnl::Channel, task::Task)

Сопоставляет время существования chnl с задачей. Канал (Channel) chnl автоматически закрывается при завершении работы задачи. Любое необнаруженное исключение в задаче распространяется на все ожидающие объекты для chnl.

Объект chnl можно явным образом закрыть независимо от завершения работы задачи. Завершение работы задач не влияет на уже закрытые объекты Channel.

Когда канал привязан к нескольким задачам, первая задача, работу которой следует прекратить, закроет канал. Когда несколько каналов привязаны к одной и той же задаче, прекращение работы задачи приведет к закрытию всех привязанных каналов.

Примеры

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]

Низкоуровневая синхронизация с использованием schedule и wait

Проще всего правильно использовать schedule в задаче Task, которая еще не запущена (запланирована). Тем не менее schedule и wait можно использовать в качестве низкоуровневого стандартного блока для построения интерфейсов синхронизации. Важнейшим предварительным условием вызова schedule(task) является то, что вызывающая сторона должна «владеть» task. То есть она должна знать, что вызов wait в данном task происходит в местах, известных коду, вызывающему schedule(task). Одна из стратегий обеспечения такого предварительного условия заключается в использовании атомарных операций, как показано в следующем примере:

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Выполнять до тех пор, пока состояние не будет обновлено до OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # Переход OWE_WAITING -> OWE_NOTIFYING означает, что задача ожидания
                # уже ждет или собирается вызвать `wait`. Задача уведомления должна пробудить
                # задачу ожидания.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Поскольку предполагается, что существует только одна задача уведомления (для простоты),
                # мы знаем, что другим возможным случаем является OWE_EMPTY.
                # Нам не нужно ничего делать, потому что мы знаем, что задача ожидания еще не
                # вызвала `wait(ev::OneWayEvent)`.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # Переход OWE_EMPTY -> OWE_WAITING означает, что задача уведомления гарантированно
        # вызовет переход OWE_WAITING -> OWE_NOTIFYING.  Задача ожидания должна
        # немедленно вызвать`wait()`.  В частности, она не должна вызывать никаких функций, которые
        # могут выдавать данные в планировщик в этой точке кода.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # В противном случае `state` уже должно быть переведено в OWE_NOTIFYING
        # задачей уведомления.
    end
    return
end

ev = OneWayEvent()
@sync begin
    @async begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# вывод
notifying...
done

OneWayEvent позволяет одной задаче ждать (wait) уведомления (notify) от другой задачи. Это ограниченный коммуникационный интерфейс, поскольку wait можно использовать только один раз из одной задачи (обратите внимание на неатомарное присваивание ev.task).

В этом примере notify(ev::OneWayEvent) может вызывать schedule(ev.task) тогда и только тогда, когда она изменяет состояние с OWE_WAITING на OWE_NOTIFYING. Таким образом мы узнаем, что задача, выполняющая wait(ev::OneWayEvent), теперь находится в ветви ok, и что не может быть других задач, которые пытаются выполнить schedule(ev.task), так как их макрос @atomicreplace(ev.state, state => OWE_NOTIFYING) завершится ошибкой.