Задачи
#
Core.Task
— Type
Task(func)
Создает Task
(т. е. сопрограмму) для выполнения данной функции func
(которая должна вызываться без аргументов). Задача завершает работу, когда возвращается эта функция. Задача будет выполняться в иерархии определения методов «возраст мира» от родительского объекта при конструировании во время schedule
.
По умолчанию для задач бит закрепления установлен в значение true |
Примеры
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
В этом примере b
представляет собой готовую к выполнению задачу (Task
), которая еще не была запущена.
#
Base.@task
— Macro
@task
Заключает в оболочку выражение в Task
, не выполняя его, и возвращает Task
. Это действие только создает задачу, а не выполняет ее.
По умолчанию для задач бит закрепления установлен в значение true |
Примеры
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
#
Base.@async
— Macro
@async
Заключает в оболочку выражение в Task
и добавляет его в очередь планировщика локального компьютера.
Значения можно интерполировать в @async
с помощью $
, который копирует значение непосредственно в сконструированное базовое замыкание. Это позволяет вставлять значение переменной, изолируя асинхронный код от изменений значения переменной в текущей задаче.
Настоятельно рекомендуется всегда отдавать предпочтение |
Совместимость: Julia 1.4
Интерполяция значений с помощью |
#
Base.asyncmap
— Function
asyncmap(f, c...; ntasks=0, batch_size=nothing)
Использует несколько параллельных задач для сопоставления f
с коллекцией (или несколькими коллекциями одинаковой длины). Для нескольких аргументов коллекции f
применяется поэлементно.
ntasks
указывает количество задач для параллельного выполнения. Если ntasks
не указано, в зависимости от длины коллекций для параллельного сопоставления будет использоваться до 100 задач.
ntasks
также можно указать в виде функции без аргументов. В данном случаеколичество запускаемых параллельно задач проверяется до обработки каждого элемента, и запускается новая задача, если значение ntasks_func
больше текущего количества задач.
Если указан batch_size
, коллекция обрабатывается в пакетном режиме. В таком случае f
должно быть функцией, принимающей вектор (Vector
) кортежей аргументов и возвращающей вектор результатов. Длина вектора ввода должна равняться длине batch_size
или быть меньше.
В следующих примерах демонстрируется выполнение различных задач путем возврата objectid
задач, в которых выполняется функция сопоставления.
Сначала, если значение ntasks
не задано, каждый элемент обрабатывается в другой задаче.
julia> tskoid() = objectid(current_task()); julia> asyncmap(x->tskoid(), 1:5) 5-element Array{UInt64,1}: 0x6e15e66c75c75853 0x440f8819a1baa682 0x9fb3eeadd0c83985 0xebd3e35fe90d4050 0x29efc93edce2b961 julia> length(unique(asyncmap(x->tskoid(), 1:5))) 5
Если значение ntasks=2
, все элементы обрабатываются в двух задачах.
julia> asyncmap(x->tskoid(), 1:5; ntasks=2) 5-element Array{UInt64,1}: 0x027ab1680df7ae94 0xa23d2f80cd7cf157 0x027ab1680df7ae94 0xa23d2f80cd7cf157 0x027ab1680df7ae94 julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2))) 2
Если значение batch_size
задано, необходимо изменить функцию сопоставления, чтобы она принимала массив кортежей аргументов и возвращала массив результатов. Для этого используется map
в измененной функции сопоставления.
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input) batch_func (generic function with 1 method) julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2) 5-element Array{String,1}: "args_tuple: (1,), element_val: 1, task: 9118321258196414413" "args_tuple: (2,), element_val: 2, task: 4904288162898683522" "args_tuple: (3,), element_val: 3, task: 9118321258196414413" "args_tuple: (4,), element_val: 4, task: 4904288162898683522" "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
#
Base.asyncmap!
— Function
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
Аналогично asyncmap
, но результат сохраняется в results
, а возвращается коллекция.
Если какой-либо измененный аргумент размещается в одной области памяти с любым другим аргументом, поведение может быть непредвиденным. |
#
Base.istaskdone
— Function
istaskdone(t::Task) -> Bool
Определяет, завершила ли задача работу.
Примеры
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
#
Base.istaskstarted
— Function
istaskstarted(t::Task) -> Bool
Определяет, начала ли задача выполнение.
Примеры
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
#
Base.istaskfailed
— Function
istaskfailed(t::Task) -> Bool
Определяет, завершила ли задача работу из-за выданного исключения.
Примеры
julia> a4() = error("task failed");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
Совместимость: Julia 1.3
Для этой функции требуется версия Julia не ниже 1.3. |
#
Base.task_local_storage
— Method
task_local_storage(key)
Выполняет поиск значения ключа в локальном для задачи хранилище текущей задачи.
#
Base.task_local_storage
— Method
task_local_storage(key, value)
Присваивает значение ключу в локальном для задачи хранилище текущей задачи.
#
Base.task_local_storage
— Method
task_local_storage(body, key, value)
Вызывает функцию body
с измененным хранилищем, локальным для задачи, в котором значение value
присваивается key
; предыдущее значение key
или его отсутствие восстанавливается позднее. Полезно для эмуляции динамического определения областей.
Планирование
#
Base.yield
— Function
yield()
Переключается на планировщик, чтобы разрешить выполнение другой запланированной задачи. Задача, которая вызывает эту функцию, по-прежнему готова к выполнению и будет немедленно перезапущена, если отсутствуют другие готовые к выполнению задачи.
yield(t::Task, arg = nothing)
Быстрая версия schedule(t, arg); yield()
с неточным планированием, которая немедленно выдает t
до вызова планировщика.
#
Base.yieldto
— Function
yieldto(t::Task, arg = nothing)
Выполняет переключение на заданную задачу. При первом переключении функция задачи вызывается без аргументов. При последующих переключениях возвращается arg
из последнего вызова yieldto
задачи. Это низкоуровневый вызов, который только переключает задачи, никоим образом не учитывая состояния или планирование. Его использование не рекомендуется.
#
Base.sleep
— Function
sleep(seconds)
Блокирует текущую задачу на указанное количество секунд. Минимальное время ожидания — 1 миллисекунда (следует ввести значение 0.001
).
#
Base.schedule
— Function
schedule(t::Task, [val]; error=false)
Добавляет задачу Task
в очередь планировщика. Это обеспечивает постоянное выполнение задачи, когда система неактивна, только если задача не выполняет операцию блокирования, например wait
.
Если указан второй аргумент val
, он будет передан задаче (через возвращаемое значение yieldto
) при повторном запуске. Если error
имеет значение true
, значение вызывается как исключение в пробужденной задаче.
Неправильно использовать |
По умолчанию для задач бит закрепления установлен в значение true |
Примеры
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
Синхронизация
#
Base.errormonitor
— Function
errormonitor(t::Task)
Выводит журнал ошибок в stderr
при сбое задачи t
.
Примеры
julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
Unhandled Task ERROR: task failed
Stacktrace:
[...]
#
Base.@sync
— Macro
@sync
Ожидает, пока все лексически обусловленные использования @async
, @spawn
, Distributed.@spawnat
и Distributed.@distributed
будут выполнены. Все исключения, выдаваемые вложенными асинхронными операциями, собираются и выдаются как CompositeException
.
Примеры
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
#
Base.wait
— Function
Специальное примечание для Threads.Condition
.
Вызывающий объект должен удерживать блокировку (lock
), которая владеет Threads.Condition
, до вызова этого метода. Вызывающая задача будет заблокирована, пока ее не пробудит какая-нибудь другая задача, обычно путем вызова notify
для того же самого объекта Threads.Condition
. Если выполнялась блокировка, она будет атомарно освобождена (даже если она осуществлялась рекурсивно) и будет повторно получена до возврата управления.
wait(r::Future)
Wait for a value to become available for the specified Future
.
wait(r::RemoteChannel, args...)
Wait for a value to become available on the specified RemoteChannel
.
wait([x])
Блокирует текущую задачу до тех пор, пока не произойдет какое-либо событие, в зависимости от типа аргумента:
-
Channel
: Wait for a value to be appended to the channel. -
Condition
: Wait fornotify
on a condition and return theval
параметр, переданный вnotify
. Ожидание для условия дополнительно позволяет передатьfirst=true
, что приводит к тому, что задача ожидания помещается первой в очередь на пробуждение при уведомлении (notify
) вместо обычного поведения FIFO. -
Process
: ожидает завершения работы процесса или цепочки процессов. Полеexitcode
процесса можно использовать для определения успешного выполнения или сбоя. -
Task
: Wait for aTask
to finish. If the task fails with an exception, a ВыдаетсяTaskFailedException
(которое инкапсулирует завершившуюся сбоем задачу). -
RawFD
: Wait for changes on a file descriptor (see theFileWatching
package).
Если аргумент не передается, задача блокируется на неопределенный период времени. Задачу можно перезапустить, только явно вызвав schedule
или yieldto
.
wait
часто вызывается в цикле while
, чтобы гарантировать выполнение ожидаемого условия, прежде чем продолжить.
wait(c::Channel)
Блокирует задачу, пока канал Channel
не будет готов (isready
).
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # задача блокируется, так как канал не готов
false
julia> put!(c, 1);
julia> istaskdone(task) # теперь задача разблокирована
true
#
Base.fetch
— Method
fetch(t::Task)
Ожидает завершения задачи Task
, а затем возвращает значение ее результата. Если задача завершается сбоем с исключением, выдается TaskFailedException
(которое инкапсулирует завершившуюся сбоем задачу).
#
Base.timedwait
— Function
timedwait(testcb, timeout::Real; pollint::Real=0.1)
Ожидает, когда testcb()
вернет значение true
или когда пройдет timeout
секунд, в зависимости от того, что произойдет ранее. Тестовая функция опрашивается каждые pollint
секунд. Минимальное значение для pollint
— 0,001 с, т. е. 1 миллисекунда.
Возвращает :ok
или :timed_out
.
Примеры
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:ok
#
Base.Condition
— Type
Condition()
Создает источник событий с запуском по фронту, который могут ожидать задачи. Задачи, которые вызывают wait
для Condition
, приостанавливаются и помещаются в очередь. Задачи пробуждаются, когда notify
позднее вызывается для Condition
. Ожидание условия может вернуть значение или вызвать ошибку, если используются необязательные аргументы notify
. Запуск по фронту означает, что можно пробудить только задачи, ожидающие во время вызова notify
. Для уведомлений с запуском по уровню необходимо поддерживать дополнительное состояние, чтобы отслеживать, имело ли место уведомление. Это делается с помощью типов Channel
и Threads.Event
, и их можно использовать для событий с запуском по уровню.
Этот объект не является потокобезопасным. Сведения о потокобезопасной версии см. в описании Threads.Condition
.
#
Base.Threads.Condition
— Type
Threads.Condition([lock])
Потокобезопасная версия Base.Condition
.
Чтобы вызвать wait
или notify
для Threads.Condition
, необходимо сначала вызвать для него lock
. При вызове wait
блокировка атомарно освобождается во время блокировки и будет повторно получена до возврата wait
. Следовательно, идиоматичное использование Threads.Condition
c
выглядит следующим образом:
lock(c) try while !thing_we_are_waiting_for wait(c) end finally unlock(c) end
Совместимость: Julia 1.2
Для этой функции требуется версия Julia не ниже 1.2. |
#
Base.Event
— Type
Event([autoreset=false])
Создает источник событий с запуском по уровню. Задачи, которые вызывают wait
для Event
, приостанавливаются и помещаются в очередь, пока не будет вызвано notify
для Event
. После вызова notify
Event
остается в сигнальном состоянии, а задачи более не будут блокироваться, ожидая его, пока вызывается reset
.
Если autoreset
имеет значение true, максимум одна задача будет освобождена из wait
для каждого вызова notify
.
Это дает возможность упорядочения памяти при получении и освобождении для уведомления или ожидания.
Совместимость: Julia 1.1
Для этой функции требуется версия Julia не ниже 1.1. |
Совместимость: Julia 1.8
Для функции |
#
Base.notify
— Function
notify(condition, val=nothing; all=true, error=false)
Пробуждает задачи, которые ожидают условие, передавая их val
. Если all
имеет значение true
(по умолчанию), пробуждаются все ожидающие задачи, а в противном случае — только одна. Если error
имеет значение true
, передаваемое значение вызывается как исключение в пробужденных задачах.
Возвращает количество пробужденных задач. Возвращает 0, если нет задач, ожидающих condition
.
#
Base.Semaphore
— Type
Semaphore(sem_size)
Создает семафор подсчета, разрешающий использовать максимум sem_size
получений в любое время. Каждое получение должно соответствовать освобождению.
Это дает возможность упорядочения памяти при получении и освобождении для получения или освобождения.
#
Base.acquire
— Function
acquire(s::Semaphore)
Ожидает доступности одного из sem_size
разрешений, выполняя блокировку до тех пор, пока не удастся его получить.
acquire(f, s::Semaphore)
Выполняет f
после получения из семафора s
и освобождения (release
) при завершении или ошибке.
Например, форма do-block, гарантирующая только два вызова foo
, будет активна одновременно:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
Совместимость: Julia 1.8
Для этого метода требуется версия Julia не ниже 1.8. |
#
Base.release
— Function
release(s::Semaphore)
Возвращает одно разрешение в пул, по возможности разрешая другой задаче получить его и возобновить выполнение.
#
Base.lock
— Function
lock(lock)
Получает блокировку (lock
), когда она становится доступной. Если блокировка уже заблокирована другой задачей или другим потоком, ожидает, пока она не станет доступной.
Каждая блокировка (lock
) должна соответствовать разблокировке (unlock
).
lock(f::Function, lock)
Получает блокировку lock
, выполняет f
с удерживаемой блокировкой lock
и освобождает блокировку lock
при возврате управления f
. Если блокировка уже заблокирована другой задачей или другим потоком, ожидает, пока она не станет доступной.
Когда эта функция возвращается, блокировка (lock
) освобождается, поэтому вызывающий объект не должен пытаться разблокировать (unlock
) ее.
См. также описание @lock
.
Совместимость: Julia 1.7
Для использования |
lock(f::Function, l::Lockable)
Получает блокировку, связанную с l
, выполняет f
с удерживаемой блокировкой и освобождает блокировку при возврате управления f
. f
получит один позиционный аргумент: значение, заключенное в l
. Если блокировка уже заблокирована другой задачей или другим потоком, ожидает, пока она не станет доступной. Когда эта функция возвращается, блокировка (lock
) освобождается, поэтому вызывающий объект не должен пытаться разблокировать (unlock
) ее.
Совместимость: Julia 1.11
Требуется версия Julia не ниже 1.11. |
#
Base.unlock
— Function
unlock(lock)
Освобождает владение блокировкой lock
.
Если это рекурсивная блокировка, которая была получена ранее, уменьшает значение внутреннего счетчика и немедленно возвращает управление.
#
Base.trylock
— Function
trylock(lock) -> Success (Boolean)
Получает блокировку, если она доступна, и возвращает значение true
при успехе. Если блокировка уже заблокирована другой задачей или потоком, возвращает false
.
Каждая успешная блокировка trylock
должна соответствовать разблокировке (unlock
).
Функцию trylock
в сочетании с islocked
можно использовать для написания алгоритмов «проверить-проверить-установить» или алгоритмов экспоненциальной задержки если поддерживается `typeof(lock)` (ознакомьтесь с соответствующей документацией).
#
Base.islocked
— Function
islocked(lock) -> Status (Boolean)
Проверяет, удерживается ли блокировка lock
какой-либо задачей или потоком. Эта функция сама по себе не должна использоваться для синхронизации. Однако функцию islocked
в сочетании с trylock
можно использовать для написания алгоритмов «проверить-проверить-установить» или алгоритмов экспоненциальной задержки если поддерживается `typeof(lock)` (ознакомьтесь с соответствующей документацией).
Расширенная справка
Например, экспоненциальная задержка может быть реализована следующим образом, если реализация lock
удовлетворяет свойствам, описанным ниже.
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
Реализация
Для реализации блокировки рекомендуется определить islocked
со следующими свойствами и указать это в соответствующем docstring.
-
Функция
islocked(lock)
свободна от «гонки по данным». -
Если
islocked(lock)
возвращаетfalse
, немедленный вызовtrylock(lock)
должен быть успешным (возвращаетtrue
), если нет помех со стороны других задач.
#
Base.ReentrantLock
— Type
ReentrantLock()
Создает входящую блокировку для синхронизации задач Task
. Одна и та же задача может получить блокировку столько раз, сколько необходимо. Каждый lock
должен сопровождаться unlock
.
Вызов lock
также воспрепятствует выполнению методов завершения для этого потока до соответствующего unlock
. Использование стандартного шаблона блокировки, показанного ниже, должно естественным образом поддерживаться, но остерегайтесь инвертировать последовательность попыток блокировки или полностью пропускать блок повторных попыток (например, пытаться выполнить возврат при все еще удерживаемой блокировке):
Это дает возможность упорядочения памяти при получении и освобождении для блокировки или разблокировки вызовов.
lock(l) try <atomic work> finally unlock(l) end
Если !islocked(lck::ReentrantLock)
удерживается, trylock(lck)
выполняется успешно, если только нет других задач, пытающихся удержать блокировку в то же самое время.
#
Base.@lock
— Macro
@lock l expr
Версия lock(f, l::AbstractLock)
в виде макроса, но с выражением expr
вместо функции f
. Расширяется до следующего:
lock(l)
try
expr
finally
unlock(l)
end
Это аналогично использованию lock
с блоком do
, но позволяет избежать создания замыкания и, таким образом, может повысить производительность.
Совместимость
Макрос |
#
Base.Lockable
— Type
Lockable(value, lock = ReentrantLock())
Создает объект Lockable
, который инкапсулирует значение value
и связывает его с предоставленным lock
. Этот объект поддерживает @lock
, lock
, trylock
, unlock
. Для доступа к значению обратитесь по индексу к блокируемому объекту, удерживая блокировку.
Совместимость: Julia 1.11
Требуется версия Julia не ниже 1.11. |
Пример
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # для доступа к значению должна удерживаться блокировка
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"
Каналы
#
Base.Channel
— Type
Channel{T=Any}(size::Int=0)
Создает канал Channel
с внутренним буфером, который может содержать объекты типа T
максимальным числом до size
. put!
вызывает полный блок канала, пока объект не будет удален с помощью take!
.
Channel(0)
создает небуферизованный канал. put!
выполняет блокировку до вызова соответствующего take!
. И наоборот.
Другие конструкторы:
-
Channel()
: конструктор по умолчанию, эквивалентенChannel{Any}(0)
-
Channel(Inf)
: эквивалентенChannel{Any}(typemax(Int))
-
Channel(sz)
: эквивалентенChannel{Any}(sz)
Совместимость: Julia 1.3
Конструктор по умолчанию |
#
Base.Channel
— Method
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
Создает новую задачу на основе func
, привязывает ее к новому каналу типа T
размером size
и планирует задачу; все это выполняется за один вызов. Канал автоматически закрывается при завершении работы задачи.
func
должна принимать привязанный канал в качестве единственного аргумента.
Если вам нужна ссылка на созданную задачу, передайте объект Ref{Task}
через именованный аргумент taskref
.
Если spawn=true
, задачу Task
, созданную для func
, можно запланировать для другого потока параллельно, что эквивалентно созданию задачи через Threads.@spawn
.
Если spawn=true
и аргумент threadpool
не задан, по умолчанию он равен :default
.
Если аргумент threadpool
установлен (в значение :default
или :interactive
), это подразумевает, что spawn=true
и новая задача создается в указанном пуле потоков.
Возвращает Channel
.
Примеры
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
Создает ссылку на созданную задачу
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
Совместимость: Julia 1.3
Параметр |
Совместимость: Julia 1.9
Аргумент |
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
#
Base.put!
— Method
put!(c::Channel, v)
Добавляет элемент v
к каналу c
. Выполняет блокировку, если канал полон.
Для небуферизованных каналов выполняет блокировку, пока take!
не будет выполнено другой задачей.
Совместимость: Julia 1.1
|
#
Base.take!
— Method
take!(c::Channel)
Удаляет и возвращает значение из Channel
по порядку. Выполняет блокировку, пока данные не станут доступны. Для небуферизованных каналов выполняет блокировку, пока put!
не будет выполнено другой задачей.
Примеры
Буферизованный канал:
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
Небуферизованный канал:
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
#
Base.isready
— Method
isready(c::Channel)
Определяет, имеет ли Channel
значение, хранимое в нем. Возвращает управление немедленно, блокировка не выполняется.
Для небуферизованных каналов возвращает значение true
, если имеются задачи, ожидающие put!
.
Примеры
Буферизованный канал:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
Небуферизованный канал:
julia> c = Channel();
julia> isready(c) # нет задач, ожидающих put!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # планирование задачи put!
julia> isready(c)
true
#
Base.fetch
— Method
fetch(c::Channel)
Ожидает и возвращает (без удаления) первый доступный элемент из Channel
. Примечание. fetch
не поддерживается в небуферизованном канале (с нулевым размером) Channel
.
Примеры
Буферизованный канал:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # элемент не удаляется
3-element Vector{Any}:
1
2
3
#
Base.bind
— Method
bind(chnl::Channel, task::Task)
Сопоставляет время существования chnl
с задачей. Channel
chnl
автоматически закрывается при завершении работы задачи. Любое необнаруженное исключение в задаче распространяется на все ожидающие объекты для chnl
.
Объект chnl
можно явным образом закрыть независимо от завершения работы задачи. Завершение работы задач не влияет на уже закрытые объекты Channel
.
Когда канал привязан к нескольким задачам, первая задача, работу которой следует прекратить, закроет канал. Когда несколько каналов привязаны к одной и той же задаче, прекращение работы задачи приведет к закрытию всех связанных каналов.
Примеры
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
Низкоуровневая синхронизация с использованием schedule
и wait
Проще всего правильно использовать schedule
в задаче Task
, которая еще не запущена (запланирована). Тем не менее schedule
и wait
можно использовать в качестве низкоуровневого стандартного блока для построения интерфейсов синхронизации. Важнейшим предварительным условием вызова schedule(task)
является то, что вызывающая сторона должна «владеть» task
. То есть она должна знать, что вызов wait
в данном task
происходит в местах, известных коду, вызывающему schedule(task)
. Одна из стратегий обеспечения такого предварительного условия заключается в использовании атомарных операций, как показано в следующем примере:
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Выполнять до тех пор, пока состояние не будет обновлено до OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# Переход OWE_WAITING -> OWE_NOTIFYING означает, что задача ожидания
# уже ждет или собирается вызвать `wait`. Задача уведомления должна пробудить
# задачу ожидания.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Поскольку предполагается, что существует только одна задача уведомления (для простоты),
# мы знаем, что другим возможным случаем является OWE_EMPTY.
# Нам не нужно ничего делать, потому что мы знаем, что задача ожидания еще не
# вызвала `wait(ev::OneWayEvent)`.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# Переход OWE_EMPTY -> OWE_WAITING означает, что задача уведомления гарантированно
# вызовет переход OWE_WAITING -> OWE_NOTIFYING. Задача ожидания должна
# немедленно вызвать`wait()`. В частности, она не должна вызывать никаких функций, которые
# могут выдавать данные в планировщик в этой точке кода.
wait()
else
@assert state == OWE_NOTIFYING
# В противном случае `state` уже должно быть переведено в OWE_NOTIFYING
# задачей уведомления.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# вывод
notifying...
done
OneWayEvent
позволяет одной задаче ждать (wait
) уведомления (notify
) от другой задачи. Это ограниченный коммуникационный интерфейс, поскольку wait
можно использовать только один раз из одной задачи (обратите внимание на неатомарное присваивание ev.task
).
В этом примере notify(ev::OneWayEvent)
может вызывать schedule(ev.task)
тогда и только тогда, когда она изменяет состояние с OWE_WAITING
на OWE_NOTIFYING
. Таким образом мы узнаем, что задача, выполняющая wait(ev::OneWayEvent)
, теперь находится в ветви ok
, и что не может быть других задач, которые пытаются выполнить schedule(ev.task)
, так как их макрос @atomicreplace(ev.state, state => OWE_NOTIFYING)
завершится ошибкой.