Задачи
#
Core.Task
— Type
Task(func)
Создает Task
(т. е. сопрограмму) для выполнения данной функции func
(которая должна вызываться без аргументов). Задача завершает работу, когда возвращается эта функция. Задача будет выполняться в иерархии определения методов «возраст мира» от родительского объекта при конструировании во время schedule
.
Примеры
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
В этом примере b
представляет собой готовую к выполнению задачу (Task
), которая еще не была запущена.
#
Base.@task
— Macro
@task
Заключает в оболочку выражение в Task
, не выполняя его, и возвращает Task
. Это действие только создает задачу, а не выполняет ее.
Примеры
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
#
Base.@async
— Macro
@async
Заключает в оболочку выражение в Task
и добавляет его в очередь планировщика локального компьютера.
Значения можно интерполировать в @async
с помощью $
, который копирует значение непосредственно в сконструированное базовое закрытие. Это позволяет вставлять значение переменной, изолируя асинхронный код от изменений значения переменной в текущей задаче.
Настоятельно рекомендуется всегда вместо |
Совместимость: Julia 1.4
Интерполяция значений с помощью |
#
Base.asyncmap
— Function
asyncmap(f, c...; ntasks=0, batch_size=nothing)
Использует несколько параллельных задач для сопоставления f
с коллекцией (или несколькими коллекциями одинаковой длины). Для нескольких аргументов коллекции f
применяется поэлементно.
ntasks
указывает количество задач для параллельного выполнения. Зависит от длины коллекций, если ntasks
не указано, для параллельного сопоставления будет использоваться до 100 задач.
ntasks
также можно указать в виде функции без аргументов. В этом случае количество запускаемых параллельно задач проверяется до обработки каждого элемента, и запускается новая задача, если значение ntasks_func
больше текущего количества задач.
Если указан batch_size
, коллекция обрабатывается в пакетном режиме. В этом случае f
должна быть функцией, принимающей вектор (Vector
) кортежей аргументов и возвращающей вектор результатов. Длина вектора ввода должна равняться длине batch_size
или быть меньше.
В следующих примерах демонстрируется выполнение различных задач путем возврата objectid
задач, в которых выполняется функция сопоставления.
Сначала, если значение ntasks
не задано, каждый элемент обрабатывается в другой задаче.
julia> tskoid() = objectid(current_task()); julia> asyncmap(x->tskoid(), 1:5) 5-element Array{UInt64,1}: 0x6e15e66c75c75853 0x440f8819a1baa682 0x9fb3eeadd0c83985 0xebd3e35fe90d4050 0x29efc93edce2b961 julia> length(unique(asyncmap(x->tskoid(), 1:5))) 5
Если значение ntasks=2
, все элементы обрабатываются в двух задачах.
julia> asyncmap(x->tskoid(), 1:5; ntasks=2) 5-element Array{UInt64,1}: 0x027ab1680df7ae94 0xa23d2f80cd7cf157 0x027ab1680df7ae94 0xa23d2f80cd7cf157 0x027ab1680df7ae94 julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2))) 2
Если значение batch_size
задано, необходимо изменить функцию сопоставления, чтобы она принимала массив кортежей аргументов и возвращала массив результатов. Для этого используется map
в измененной функции сопоставления.
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input) batch_func (generic function with 1 method) julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2) 5-element Array{String,1}: "args_tuple: (1,), element_val: 1, task: 9118321258196414413" "args_tuple: (2,), element_val: 2, task: 4904288162898683522" "args_tuple: (3,), element_val: 3, task: 9118321258196414413" "args_tuple: (4,), element_val: 4, task: 4904288162898683522" "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
В настоящее время все задачи в языке Julia выполняются совместно в одном потоке ОС. Следовательно, |
#
Base.istaskdone
— Function
istaskdone(t::Task) -> Bool
Определяет, завершила ли задача работу.
Примеры
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
#
Base.istaskstarted
— Function
istaskstarted(t::Task) -> Bool
Определяет, начала ли задача выполнение.
Примеры
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
#
Base.istaskfailed
— Function
istaskfailed(t::Task) -> Bool
Определяет, завершила ли задача работу из-за выданного исключения.
Примеры
julia> a4() = error("task failed");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
Совместимость: Julia 1.3
Для этой функции требуется версия не ниже Julia 1.3. |
#
Base.task_local_storage
— Method
task_local_storage(key)
Выполняет поиск значения ключа в локальном для задачи хранилище текущей задачи.
#
Base.task_local_storage
— Method
task_local_storage(key, value)
Присваивает значение ключу в локальном для задачи хранилище текущей задачи.
#
Base.task_local_storage
— Method
task_local_storage(body, key, value)
Вызывает функцию body
с измененным хранилищем, локальным для задачи, в котором значение value
присваивается key
; предыдущее значение key
или его отсутствие восстанавливается позднее. Полезно для эмуляции динамического определения областей.
Планирование
#
Base.yield
— Function
yield()
Переключается на планировщик, чтобы разрешить выполнение другой запланированной задачи. Задача, которая вызывает эту функцию, по-прежнему готова к выполнению и будет немедленно перезапущена, если отсутствуют другие готовые к выполнению задачи.
yield(t::Task, arg = nothing)
Быстрая версия schedule(t, arg); yield()
с неточным планированием, которая немедленно выдает t
до вызова планировщика.
#
Base.yieldto
— Function
yieldto(t::Task, arg = nothing)
Переключение на данную задачу. При первом переключении функция задачи вызывается без аргументов. При последующих переключениях возвращается arg
из последнего вызова задачи yieldto
. Это низкоуровневый вызов, который только переключает задачи, никоим образом не учитывая состояния или планирование. Его использование не рекомендуется.
#
Base.sleep
— Function
sleep(seconds)
Блокирует текущую задачу на указанное количество секунд. Минимальное время ожидания — 1 миллисекунда (следует ввести значение 0.001
).
#
Base.schedule
— Function
schedule(t::Task, [val]; error=false)
Добавляет задачу (Task
) в очередь планировщика. Это обеспечивает постоянное выполнение задачи, когда система неактивна, только если задача не выполняет операцию блокирования, например wait
.
Если указан второй аргумент val
, он будет передан задаче (через возвращаемое значение yieldto
) при повторном запуске. Если error
имеет значение true
, значение вызывается как исключение в пробужденной задаче.
Неправильно использовать |
Примеры
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
Синхронизация
#
Base.errormonitor
— Function
errormonitor(t::Task)
Выводит журнал ошибок в stderr
при сбое задачи t
.
Примеры
julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
Unhandled Task ERROR: task failed
Stacktrace:
[...]
#
Base.@sync
— Macro
@sync
Ожидает, пока все лексически обусловленные использования @async
, @spawn
, @spawnat
и @distributed
будут выполнены. Все исключения, выдаваемые вложенными асинхронными операциями, собираются и выдаются как CompositeException
.
Примеры
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
#
Base.wait
— Method
wait([x])
Блокирует текущую задачу до тех пор, пока не произойдет какое-либо событие, в зависимости от типа аргумента:
-
Channel
: ожидает значение, которое следует добавить в канал. -
Condition
: ожидает уведомление (notify
) для условия и возвращает параметрval
, переданный вnotify
. Ожидание для условия дополнительно позволяет передатьfirst=true
, что приводит к тому, что задача ожидания помещается первой в очередь на пробуждение при уведомлении (notify
) вместо обычного поведения FIFO. -
Process
: ожидает завершения работы процесса или цепочки процессов. Полеexitcode
процесса можно использовать для определения успешного выполнения или сбоя. -
Task
: ожидает завершения задачи (Task
). Если задача завершается сбоем с исключением, выдаетсяTaskFailedException
(которое заключает в оболочку завершившуюся сбоем задачу). -
RawFD
: ожидает изменений дескриптора файла (см. пакетFileWatching
).
Если аргумент не передается, задача блокируется на неопределенный период времени. Задачу можно перезапустить, только явно вызвав schedule
или yieldto
.
wait
часто вызывается в цикле while
, чтобы гарантировать выполнение ожидаемого условия, прежде чем продолжить.
#
Base.wait
— Method
wait(r::RemoteChannel, args...)
Ожидает, когда станет доступным значение для указанного RemoteChannel
.
#
Base.fetch
— Method
fetch(t::Task)
Ожидает завершения задачи, а затем возвращает значение ее результата. Если задача завершается сбоем с исключением, выдается TaskFailedException
(которое заключает в оболочку завершившуюся сбоем задачу) .
#
Base.timedwait
— Function
timedwait(testcb, timeout::Real; pollint::Real=0.1)
Ожидает, когда testcb()
вернет значение true
или пройдет timeout
секунд, в зависимости от того, что произойдет ранее. Тестовая функция опрашивается каждые pollint
секунд. Минимальное значение для pollint
— 0,001 с, т. е. 1 миллисекунда.
Возвращает :ok
или :timed_out
.
#
Base.Condition
— Type
Condition()
Создает источник событий с запуском по фронту, который могут ожидать задачи. Задачи, которые вызывают wait
для Condition
, приостанавливаются и помещаются в очередь. Задачи пробуждаются, когда notify
позднее вызывается для Condition
. Запуск по фронту означает, что можно пробудить только задачи, ожидающие во время вызова notify
. Для уведомлений с запуском по уровню необходимо поддерживать дополнительное состояние, чтобы отслеживать, имело ли место уведомление. Это делается с помощью типов Channel
и Threads.Event
, и их можно использовать для событий с запуском по уровню.
Этот объект не является потокобезопасным. Сведения о потокобезопасной версии см. в разделе Threads.Condition
.
#
Base.Threads.Condition
— Type
Threads.Condition([lock])
Потокобезопасная версия Base.Condition
.
Чтобы вызвать wait
или notify
для Threads.Condition
, необходимо сначала вызвать для него lock
. При вызове wait
блокировка атомарно освобождается во время блокировки и будет повторно получена до возврата wait
. Следовательно, идиоматичное использование Threads.Condition
c
выглядит следующим образом.
lock(c) try while !thing_we_are_waiting_for wait(c) end finally unlock(c) end
Совместимость: Julia 1.2
Для этой функции требуется версия не ниже Julia 1.2. |
#
Base.Event
— Type
Event([autoreset=false])
Создает источник событий с запуском по уровню. Задачи, которые вызывают wait
для Event
, приостанавливаются и помещаются в очередь, пока не будет вызвано notify
для Event
. После вызова notify
Event
остается в сигнальном состоянии, а задачи более не будут блокироваться, ожидая его, пока вызывается reset
.
Если autoreset
имеет значение true, максимум одна задача будет освобождена из wait
для каждого вызова notify
.
Это дает возможность упорядочения памяти при получении и освобождении для уведомления или ожидания.
Совместимость: Julia 1.1
Для этой функции требуется версия не ниже Julia 1.1. |
Совместимость: Julia 1.8
Для функции |
#
Base.notify
— Function
notify(condition, val=nothing; all=true, error=false)
Пробуждает задачи, которые ожидают условие, передавая их val
. Если all
имеет значение true
(по умолчанию), пробуждаются все ожидающие задачи, а в противном случае — только одна. Если error
имеет значение true
, передаваемое значение вызывается как исключение в пробужденных задачах.
Возвращает количество пробужденных задач. Возвращает 0, если нет задач, ожидающих condition
.
#
Base.reset
— Method
reset(::Event)
Сбрасывает событие обратно в незаданное состояние. Затем все последующие вызовы wait
будут блокироваться до повторного вызова notify
.
#
Base.Semaphore
— Type
Semaphore(sem_size)
Создает семафор подсчета, разрешающий использовать максимум получений (sem_size
) в любое время. Каждое получение должно соответствовать освобождению.
Это дает возможность упорядочения памяти при получении и освобождении для получения или освобождения.
#
Base.acquire
— Function
acquire(s::Semaphore)
Ожидает доступности одного из разрешений sem_size
, выполняя блокировку, пока не удастся его получить.
acquire(f, s::Semaphore)
Выполняет f
после получения из семафора s
и освобождения (release
) при завершении или ошибке.
Например, форма do-block, гарантирующая только два вызова foo
, будет активна одновременно:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
Совместимость: Julia 1.8
Для этого метода требуется версия Julia не ниже 1.8. |
#
Base.release
— Function
release(s::Semaphore)
Возвращает одно разрешение в пул, по возможности разрешая другой задаче получить его и возобновить выполнение.
#
Base.lock
— Function
lock(lock)
Получает блокировку (lock
), когда она становится доступной. Если блокировка уже заблокирована другой задачей или потоком, подождите, когда она станет доступной.
Каждая блокировка (lock
) должна соответствовать разблокировке (unlock
).
lock(f::Function, lock)
Получает блокировку lock
, выполняет f
с удерживаемой блокировкой lock
и освобождает блокировку lock
при возврате f
. Если блокировка уже заблокирована другой задачей или другим потоком, ожидает, пока она не станет доступной.
Когда эта функция возвращается, блокировка (lock
) освобождается, поэтому вызывающий объект не должен пытаться разблокировать (unlock
) ее.
Совместимость: Julia 1.7
Для использования |
#
Base.unlock
— Function
unlock(lock)
Освобождает владение блокировкой (lock
).
Если это рекурсивная блокировка, которая была получена ранее, уменьшает значение внутреннего счетчика и немедленно возвращается.
#
Base.trylock
— Function
trylock(lock) -> Success (Boolean)
Получает блокировку, если она доступна, и возвращает значение true
при успехе. Если блокировка уже заблокирована другой задачей или потоком, возвращается false
.
Каждая блокировка (trylock
) должна соответствовать разблокировке (unlock
).
Функцию trylock
в сочетании с islocked
можно использовать для написания алгоритмов «проверить-проверить-установить» или алгоритмов экспоненциальной задержки (* если поддерживается) typeof(lock)
* (ознакомьтесь с соответствующей документацией).
#
Base.islocked
— Function
islocked(lock) -> Status (Boolean)
Проверяет, удерживается ли блокировка (lock
) какой-либо задачей или каким-либо потоком. Эта функция сама по себе не должна использоваться для синхронизации. Однако islocked
в сочетании с trylock
можно использовать для написания алгоритмов «проверить-проверить-установить» или алгоритмов экспоненциальной задержки если поддерживается `typeof(lock)` (ознакомьтесь с соответствующей документацией).
Расширенная справка
Например, экспоненциальная задержка может быть реализована следующим образом, если реализация lock
удовлетворяет свойствам, описанным ниже.
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
Реализация
Реализации блокировки рекомендуется определить islocked
со следующими свойствами и указать это в соответствующей документации.
-
Функция
islocked(lock)
свободна от "гонки по данным". -
Если
islocked(lock)
возвращаетfalse
, немедленный вызовtrylock(lock)
должен быть успешным (возвращаетtrue
), если нет помех со стороны других задач.
#
Base.ReentrantLock
— Type
ReentrantLock()
Создает входящую блокировку для синхронизации задач (Task
). Одна и та же задача может получить блокировку столько раз, сколько необходимо. Каждая блокировка (lock
) должна соответствовать разблокировке (unlock
).
Вызов «блокировки» также воспрепятствует выполнению методов завершения для этого потока до соответствующей «разблокировки». Использование стандартного шаблона блокировки, показанного ниже, должно естественным образом поддерживаться, но остерегайтесь инвертировать последовательность попыток блокировки или полностью пропускать блок повторных попыток (например, пытаться выполнить возврат при все еще удерживаемой блокировке):
Это дает возможность упорядочения памяти при получении и освобождении для блокировки или разблокировки вызовов.
lock(l) try <atomic work> finally unlock(l) end
Если !islocked(lck::ReentrantLock)
удерживается, trylock(lck)
выполняется успешно, если только нет других задач, пытающихся удержать блокировку в то же самое время.
Каналы
#
Base.Channel
— Type
Channel{T=Any}(size::Int=0)
Конструирует канал (Channel
) с внутренним буфером, который может содержать объекты типа T
максимальным числом до size
. put!
вызывает полный блок канала, пока объект не будет удален с помощью take!
.
Channel(0)
конструирует небуферизованный канал. put!
выполняет блокировку до вызова соответствующего take!
. И наоборот.
Другие конструкторы:
-
Channel()
: конструктор по умолчанию, эквивалентенChannel{Any}(0)
-
Channel(Inf)
: эквивалентенChannel{Any}(typemax(Int))
-
Channel(sz)
: эквивалентенChannel{Any}(sz)
Совместимость: Julia 1.3
Конструктор по умолчанию |
#
Base.Channel
— Method
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false)
Создает новую задачу на основе func
, привязывает ее к новому каналу типа T
размером size
и планирует задачу; все это выполняется за один вызов. Канал автоматически закрывается при завершении работы задачи.
func
должна принимать привязанный канал в качестве единственного аргумента.
Если вам нужна ссылка на созданную задачу, передайте объект Ref{Task}
через аргумент ключевого слова taskref
.
Если spawn = true
, задачу, созданную для func
, можно запланировать для другого потока параллельно, что эквивалентно созданию задачи через Threads.@spawn
.
Возвращает Channel
.
Примеры
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
Создает ссылку на созданную задачу
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
Совместимость: Julia 1.3
Параметр |
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
#
Base.put!
— Method
put!(c::Channel, v)
Добавляет элемент v
к каналу c
. Выполняет блокировку, если канал полон.
Для небуферизованных каналов выполняет блокировку, пока take!
не будет выполнено другой задачей.
Совместимость: Julia 1.1
|
#
Base.take!
— Method
take!(c::Channel)
Удаляет и возвращает значение из Channel
по порядку. Выполняет блокировку, пока данные не станут доступны. Для небуферизованных каналов выполняет блокировку, пока put!
не будет выполнено другой задачей.
Примеры
Буферизованный канал:
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
Небуферизованный канал:
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
#
Base.isready
— Method
isready(c::Channel)
Определяет, имеет ли Channel
значение, хранимое в нем. Возвращает немедленно, блокировка не выполняется.
Для небуферизованных каналов возвращает значение true
, если имеются задачи, ожидающие put!
.
Примеры
Буферизованный канал:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
Небуферизованный канал:
julia> c = Channel();
julia> isready(c) # нет задач, ожидающих put!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # планирование задачи put!
julia> isready(c)
true
#
Base.fetch
— Method
fetch(c::Channel)
Ожидает и возвращает (без удаления) первый доступный элемент из Channel
. Примечание. fetch
не поддерживается в небуферизованном канале (с нулевым размером) Channel
.
Примеры
Буферизованный канал:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # элемент не удаляется
3-element Vector{Any}:
1
2
3
#
Base.bind
— Method
bind(chnl::Channel, task::Task)
Сопоставляет время существования chnl
с задачей. Канал (Channel
) chnl
автоматически закрывается при завершении работы задачи. Любое необнаруженное исключение в задаче распространяется на все ожидающие объекты для chnl
.
Объект chnl
можно явным образом закрыть независимо от завершения работы задачи. Завершение работы задач не влияет на уже закрытые объекты Channel
.
Когда канал привязан к нескольким задачам, первая задача, работу которой следует прекратить, закроет канал. Когда несколько каналов привязаны к одной и той же задаче, прекращение работы задачи приведет к закрытию всех привязанных каналов.
Примеры
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
Низкоуровневая синхронизация с использованием schedule
и wait
Проще всего правильно использовать schedule
в задаче Task
, которая еще не запущена (запланирована). Тем не менее schedule
и wait
можно использовать в качестве низкоуровневого стандартного блока для построения интерфейсов синхронизации. Важнейшим предварительным условием вызова schedule(task)
является то, что вызывающая сторона должна «владеть» task
. То есть она должна знать, что вызов wait
в данном task
происходит в местах, известных коду, вызывающему schedule(task)
. Одна из стратегий обеспечения такого предварительного условия заключается в использовании атомарных операций, как показано в следующем примере:
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Выполнять до тех пор, пока состояние не будет обновлено до OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# Переход OWE_WAITING -> OWE_NOTIFYING означает, что задача ожидания
# уже ждет или собирается вызвать `wait`. Задача уведомления должна пробудить
# задачу ожидания.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Поскольку предполагается, что существует только одна задача уведомления (для простоты),
# мы знаем, что другим возможным случаем является OWE_EMPTY.
# Нам не нужно ничего делать, потому что мы знаем, что задача ожидания еще не
# вызвала `wait(ev::OneWayEvent)`.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# Переход OWE_EMPTY -> OWE_WAITING означает, что задача уведомления гарантированно
# вызовет переход OWE_WAITING -> OWE_NOTIFYING. Задача ожидания должна
# немедленно вызвать`wait()`. В частности, она не должна вызывать никаких функций, которые
# могут выдавать данные в планировщик в этой точке кода.
wait()
else
@assert state == OWE_NOTIFYING
# В противном случае `state` уже должно быть переведено в OWE_NOTIFYING
# задачей уведомления.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# вывод
notifying...
done
OneWayEvent
позволяет одной задаче ждать (wait
) уведомления (notify
) от другой задачи. Это ограниченный коммуникационный интерфейс, поскольку wait
можно использовать только один раз из одной задачи (обратите внимание на неатомарное присваивание ev.task
).
В этом примере notify(ev::OneWayEvent)
может вызывать schedule(ev.task)
тогда и только тогда, когда она изменяет состояние с OWE_WAITING
на OWE_NOTIFYING
. Таким образом мы узнаем, что задача, выполняющая wait(ev::OneWayEvent)
, теперь находится в ветви ok
, и что не может быть других задач, которые пытаются выполнить schedule(ev.task)
, так как их макрос @atomicreplace(ev.state, state => OWE_NOTIFYING)
завершится ошибкой.