Tasks
#
Core.Task
— Type
Task(func)
Creates a Task
(i.e. a coroutine) to execute this func
function (which must be called without arguments). The task terminates when this function returns. The task will be performed in the hierarchy of determining the "age of the world" methods from the parent object during construction during schedule
.
By default, the pinning bit is set to true |
Examples
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
In this example, b
represents a ready-to-run task that has not yet been started.
#
Base.@task
— Macro
@task
Wraps the expression in Task
, without executing it, and returns Task
. This action only creates a task, not executes it.
By default, the pinning bit is set to true |
Examples
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
#
Base.@async
— Macro
@async
Wraps the expression in Task
and adds it to the scheduler queue of the local computer.
Values can be interpolated into @async
using $
, which copies the value directly into the constructed base closure. This allows you to insert the _value of a variable, isolating asynchronous code from changes in the value of the variable in the current task.
It is strongly recommended to always give preference to |
Compatibility: Julia 1.4
Interpolation of values using |
#
Base.asyncmap
— Function
asyncmap(f, c...; ntasks=0, batch_size=nothing)
Uses multiple parallel tasks to map f
to a collection (or multiple collections of the same length). For multiple arguments of the collection, f
is applied element-by-element.
'ntasks` indicates the number of tasks to run in parallel. If ntasks
is not specified, up to 100 tasks will be used for parallel matching, depending on the length of the collections.
'ntasks` can also be specified as a function without arguments. In this case, the number of tasks running in parallel is checked before processing each element, and a new task is started if the value of ntasks_func
is greater than the current number of tasks.
If batch_size
is specified, the collection is processed in batch mode. In this case, f' should be a function that accepts a vector of tuples of arguments and returns a vector of results. The length of the input vector must be equal to the length of `batch_size
or less.
The following examples demonstrate how to perform various tasks by returning the objectid
of the tasks in which the matching function is performed.
First, if the ntasks
value is not set, each item is processed in a different task.
julia> tskoid() = objectid(current_task()); julia> asyncmap(x->tskoid(), 1:5) 5-element Array{UInt64,1}: 0x6e15e66c75c75853 0x440f8819a1baa682 0x9fb3eeadd0c83985 0xebd3e35fe90d4050 0x29efc93edce2b961 julia> length(unique(asyncmap(x->tskoid(), 1:5))) 5
If the value is `ntasks=2', all items are processed in two tasks.
julia> asyncmap(x->tskoid(), 1:5; ntasks=2) 5-element Array{UInt64,1}: 0x027ab1680df7ae94 0xa23d2f80cd7cf157 0x027ab1680df7ae94 0xa23d2f80cd7cf157 0x027ab1680df7ae94 julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2))) 2
If the batch_size
value is set, the matching function must be modified so that it accepts an array of tuples of arguments and returns an array of results. To do this, use map
in the modified mapping function.
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input) batch_func (generic function with 1 method) julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2) 5-element Array{String,1}: "args_tuple: (1,), element_val: 1, task: 9118321258196414413" "args_tuple: (2,), element_val: 2, task: 4904288162898683522" "args_tuple: (3,), element_val: 3, task: 9118321258196414413" "args_tuple: (4,), element_val: 4, task: 4904288162898683522" "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
#
Base.istaskdone
— Function
istaskdone(t::Task) -> Bool
Determines whether the task has completed its work.
Examples
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
#
Base.istaskstarted
— Function
istaskstarted(t::Task) -> Bool
Determines whether the task has started execution.
Examples
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
#
Base.istaskfailed
— Function
istaskfailed(t::Task) -> Bool
Determines whether the task has completed due to an exception.
Examples
julia> a4() = error("task failed");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
Compatibility: Julia 1.3
This feature requires a Julia version of 1.3 or higher. |
#
Base.task_local_storage
— Method
task_local_storage(key)
Searches for the key value in the task’s local storage for the current task.
#
Base.task_local_storage
— Method
task_local_storage(key, value)
Assigns a value to a key in the task’s local storage for the current task.
#
Base.task_local_storage
— Method
task_local_storage(body, key, value)
Calls the 'body` function with the changed storage, local to the task, in which the value
value is assigned to key
; the previous key
value or its absence is restored later. Useful for emulating dynamic scoping.
Planning
#
Base.yield
— Function
yield()
Switches to the scheduler to allow the execution of another scheduled task. The task that calls this function is still ready to run and will be restarted immediately if there are no other tasks ready to run.
yield(t::Task, arg = nothing)
A fast version of schedule(t, arg); yield()
with inaccurate scheduling, which immediately outputs t
before calling the scheduler.
#
Base.yieldto
— Function
yieldto(t::Task, arg = nothing)
Switches to a given task. When switching for the first time, the task function is called without arguments. On subsequent switches, the arg
from the last task yieldto
call is returned. This is a low-level call that only switches tasks, in no way considering states or scheduling. Its use is not recommended.
#
Base.sleep
— Function
sleep(seconds)
Blocks the current task for the specified number of seconds. The minimum waiting time is 1 millisecond (enter the value 0.001
).
#
Base.schedule
— Function
schedule(t::Task, [val]; error=false)
Adds a task Task
to the scheduler queue. This ensures that the task is constantly running when the system is inactive, only if the task is not performing a blocking operation, for example wait
.
If the second argument val
is specified, it will be passed to the task (via the return value yieldto'
) when restarting. If `error has the value `true', the value is called as an exception in the awakened task.
It is incorrect to use |
By default, the pinning bit is set to true |
Examples
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
Synchronization
#
Base.errormonitor
— Function
errormonitor(t::Task)
Outputs the error log to stderr
when task t
fails.
Examples
julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
Unhandled Task ERROR: task failed
Stacktrace:
[...]
#
Base.@sync
— Macro
@sync
Waits until all lexically determined uses @async
, @spawn
, Distributed.@spawnat
and Distributed.@distributed
will be executed. All exceptions thrown by nested asynchronous operations are collected and issued as CompositeException
.
Examples
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
#
Base.wait
— Function
Special note for Threads.Condition
.
The caller must hold the lock (lock
), which owns Threads.Condition
, before calling this method. The calling task will be blocked until it is awakened by some other task, usually by calling notify
for the same Threads' object.Condition
. If a lock was performed, it will be atomically released (even if it was performed recursively) and will be re-acquired before control returns.
wait(r::Future)
Wait for a value to become available for the specified Future
.
wait(r::RemoteChannel, args...)
Wait for a value to become available on the specified RemoteChannel
.
wait([x])
Blocks the current task until an event occurs, depending on the type of argument.:
-
Channel
: Wait for a value to be appended to the channel. -
Condition
: Wait fornotify
on a condition and return theval' parameter passed to `notify'. Waiting for the condition additionally allows you to pass `first=true
, which causes the waiting task to be placed first in the notification wake queue (`notify') instead of the usual FIFO behavior. -
Process
: Waits for a process or chain of processes to complete. The `exitcode' field of a process can be used to determine whether it was successful or failed. -
Task
: Wait for aTask
to finish. If the task fails with an exception, aTaskFailedException
is issued (which encapsulates the failed task). -
RawFD
: Wait for changes on a file descriptor (see theFileWatching
package).
If no argument is passed, the task is blocked for an indefinite period of time. The task can be restarted only by explicitly calling schedule
or yieldto
.
'wait` is often called in the 'while` loop to ensure that an expected condition is met before continuing.
wait(c::Channel)
Blocks the task until the Channel' is ready (`isready
).
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # задача блокируется, так как канал не готов
false
julia> put!(c, 1);
julia> istaskdone(task) # теперь задача разблокирована
true
#
Base.fetch
— Method
fetch(t::Task)
Awaiting task completion Task
, and then returns the value of its result. If the task fails with an exception, it is issued TaskFailedException
(which encapsulates the failed task).
#
Base.timedwait
— Function
timedwait(testcb, timeout::Real; pollint::Real=0.1)
Waits for testcb()`will return the value `true
or when the timeout
seconds have passed, depending on what happens earlier. The test function is polled every pollint
seconds. The minimum value for pollint is 0.001 s, i.e. 1 millisecond.
Returns :ok
or `:timed_out'.
Examples
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:ok
#
Base.Condition
— Type
Condition()
Creates a front-end event source that tasks can expect. Tasks that cause wait
for Condition', are suspended and placed in the queue. Tasks are awakened when `notify
is later called for Condition'. Waiting for a condition may return a value or cause an error if optional arguments are used. `notify
. Running at the front means that only tasks that are waiting during a call can be woken up. notify
. For level-triggered notifications, an additional state must be maintained to track whether a notification has occurred. This is done using types Channel
and Threads.Event
, and they can be used for level-triggered events.
This object is not thread-safe. For information about the thread-safe version, see the description Threads.Condition
.
#
Base.Threads.Condition
— Type
Threads.Condition([lock])
Thread-safe version Base.Condition
.
To call wait
or notify
for Threads.Condition
, must first be called for it lock
. When wait' is called, the lock is atomically released during the lock and will be reclaimed before `wait
returns. Hence the idiomatic use of Threads.Condition
c
looks like this:
lock(c) try while !thing_we_are_waiting_for wait(c) end finally unlock(c) end
Compatibility: Julia 1.2
This feature requires a Julia version of at least 1.2. |
#
Base.Event
— Type
Event([autoreset=false])
Creates a level-triggered event source. Tasks that cause wait
for Event', are paused and queued until called `notify
for Event'. After calling `notify
, the Event
remains in the alarm state, and tasks will no longer block, waiting for it while reset
is called.
If autoreset' is set to true, a maximum of one task will be released from `wait
for each notify
call.
This makes it possible to organize the memory when it is received and released for notification or waiting.
Compatibility: Julia 1.1
This feature requires a Julia version of at least 1.1. |
Compatibility: Julia 1.8
The |
#
Base.notify
— Function
notify(condition, val=nothing; all=true, error=false)
Wakes up the tasks that are waiting for the condition by passing them to val'. If `all
is set to true
(by default), all pending tasks are awakened, otherwise only one is. If error
has the value `true', the passed value is called as an exception in the awakened tasks.
Returns the number of awakened tasks. Returns 0 if there are no tasks waiting for `condition'.
#
Base.Semaphore
— Type
Semaphore(sem_size)
Creates a counting semaphore that allows you to use the maximum sem_size
of receives at any time. Each receipt must correspond to an exemption.
This makes it possible to arrange the memory when receiving and releasing for receiving or releasing.
#
Base.acquire
— Function
acquire(s::Semaphore)
Waits for one of the sem_size
permissions to be available, locking it until it can be obtained.
acquire(f, s::Semaphore)
Executes f
after receiving s
from the semaphore and releasing (release
) upon completion or error.
For example, the do-block form, which guarantees only two calls to foo
, will be active at the same time.:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
Compatibility: Julia 1.8
This method requires a Julia version of at least 1.8. |
#
Base.release
— Function
release(s::Semaphore)
Returns one permission to the pool, if possible allowing another task to receive it and resume execution.
#
Base.lock
— Function
lock(lock)
Obtains a lock when it becomes available. If the lock has already been blocked by another task or another thread, it waits until it becomes available.
Each lock must correspond to an unlock (unlock
).
lock(f::Function, lock)
Obtains the lock
lock, executes f
with the lock
lock held, and releases the lock
lock when control of f
returns. If the lock has already been blocked by another task or another thread, it waits until it becomes available.
When this function returns, the lock is released, so the caller should not attempt to unlock it.
See also the description @lock
.
Compatibility: Julia 1.7
To use |
lock(f::Function, l::Lockable)
Obtains the lock associated with l
, executes f
with the lock held, and releases the lock when control of f
returns. 'f` will receive one positional argument: the value enclosed in l
. If the lock has already been blocked by another task or another thread, it waits until it becomes available. When this function returns, the lock is released, so the caller should not attempt to unlock it.
Compatibility: Julia 1.11
Requires a version of Julia not lower than 1.11. |
#
Base.unlock
— Function
unlock(lock)
Frees up ownership of the lock
.
If it is a recursive lock that was obtained earlier, decrease the value of the internal counter and immediately return control.
#
Base.trylock
— Function
trylock(lock) -> Success (Boolean)
Obtains the lock if it is available, and returns the value true
on success. If the lock has already been blocked by another task or thread, returns `false'.
Each successful trylock
lock must correspond to an unlock (unlock
).
The 'trylock` function in combination with islocked
can be used to write check-check-install algorithms or exponential delay algorithms if `typeof(lock)` is supported (check out the relevant documentation).
#
Base.islocked
— Function
islocked(lock) -> Status (Boolean)
Checks whether the lock
is being held by any task or thread. This function itself should not be used for synchronization. However, the islocked
function in combination with 'trylock` can be used to write check-check-install algorithms or exponential delay algorithms if `typeof(lock)` is supported (check out the relevant documentation).
Advanced Help
For example, exponential delay can be implemented as follows if the implementation of `lock' satisfies the properties described below.
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
Implementation
To implement the lock, it is recommended to define islocked
with the following properties and specify this in the corresponding docstring.
-
The
islocked(lock)
function is free from "data race". -
If
islocked(lock)
returnsfalse
, an immediate call totrylock(lock)
should be successful (returnstrue
) if there is no interference from other tasks.
#
Base.ReentrantLock
— Type
ReentrantLock()
Creates an incoming lock for task synchronization Task
. The same task can get blocked as many times as necessary. Each lock
must be accompanied by unlock
.
Calling lock
will also prevent the execution of termination methods for this thread before the corresponding unlock
. Using the standard locking pattern shown below should naturally be supported, but beware of inverting the sequence of locking attempts or skipping the retry block entirely (for example, trying to make a return while the lock is still being held):
This makes it possible to organize the memory when receiving and releasing to block or unlock calls.
lock(l) try <atomic work> finally unlock(l) end
If !islocked(lck::ReentrantLock)
is being held, trylock(lck)
succeeds unless there are other tasks trying to hold the lock at the same time.
#
Base.@lock
— Macro
@lock l expr
The version of lock(f, l::AbstractLock)
as a macro, but with the expression expr
instead of the function f
. Expands to the following:
lock(l)
try
expr
finally
unlock(l)
end
This is similar to using lock
with the 'do` block, but avoids creating a closure and thus can improve performance.
Compatibility
The macro |
#
Base.Lockable
— Type
Lockable(value, lock = ReentrantLock())
Creates a Lockable
object that encapsulates the value' and binds it to the provided `lock'. This object supports
@lock`, lock
, trylock
, unlock
. To access the value, access the index of the object being locked while holding the lock.
Compatibility: Julia 1.11
Requires a version of Julia not lower than 1.11. |
Example
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # для доступа к значению должна удерживаться блокировка
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"
Channels
#
Base.AbstractChannel
— Type
AbstractChannel{T}
Representation of a channel transmitting objects of type `T'.
#
Base.Channel
— Type
Channel{T=Any}(size::Int=0)
Creates a channel with an internal buffer that can contain objects of type T
with a maximum number up to size'. `put!
calls the full channel block until the object is deleted using take!
.
Channel(0)
creates an unbuffered channel. put!
performs a lock until the corresponding take!' is called.
. And vice versa.
Other constructors:
-
Channel()
: default constructor, equivalent toChannel{Any}(0)
-
Channel(Inf)
: equivalent toChannel{Any}(typemax(Int))
-
Channel(sz)
: equivalent toChannel{Any}(sz)
Compatibility: Julia 1.3
The default constructor is |
#
Base.Channel
— Method
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
Creates a new task based on func
, binds it to a new channel of type T
with the size size
and schedules the task; all this is done in one call. The channel is automatically closed when the task is completed.
func
should take the linked channel as the only argument.
If you need a link to the created task, pass the object Ref{Task}
via the named argument `taskref'.
If spawn=true
, the Task
created for func
can be scheduled for another thread in parallel, which is equivalent to creating a task via Threads.@spawn
.
If spawn=true
and the threadpool
argument is not set, it defaults to :default
.
If the threadpool
argument is set (to :default
or :interactive
), it means that spawn=true
and a new task is created in the specified thread pool.
Returns the `Channel'.
Examples
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
Creates a link to the created task
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
Compatibility: Julia 1.3
The |
Compatibility: Julia 1.9
The |
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
#
Base.put!
— Method
put!(c::Channel, v)
Adds the v
element to the c
channel. Performs blocking if the channel is full.
For unbuffered channels, performs blocking until take!
will not be completed by another task.
Compatibility: Julia 1.1
The |
#
Base.take!
— Method
take!(c::Channel)
Deletes and returns a value from Channel
in order. Performs a lock until the data becomes available. For unbuffered channels, performs blocking until put!
will not be completed by another task.
Examples
Buffered channel:
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
Unbuffered channel:
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
#
Base.isready
— Method
isready(c::Channel)
Determines whether it has Channel
the value stored in it. Returns control immediately, the lock is not executed.
For unbuffered channels, returns the value true
if there are tasks waiting. put!
.
Examples
Buffered channel:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
Unbuffered channel:
julia> c = Channel();
julia> isready(c) # нет задач, ожидающих put!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # планирование задачи put!
julia> isready(c)
true
#
Base.fetch
— Method
fetch(c::Channel)
Waits for and returns (without deleting) the first available element from the Channel
. Note. 'fetch` is not supported in the unbuffered (zero-size) `Channel'.
Examples
Buffered channel:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # элемент не удаляется
3-element Vector{Any}:
1
2
3
#
Base.bind
— Method
bind(chnl::Channel, task::Task)
Compares the lifetime of the chnl
with the task. Channel
The 'chnl` is automatically closed when the task is completed. Any undetected exception in a task is propagated to all pending objects for chnl
.
The 'chnl` object can be explicitly closed regardless of the completion of the task. Completing tasks does not affect already closed Channel
objects.
When a channel is linked to multiple tasks, the first task to be terminated will close the channel. When multiple channels are linked to the same task, terminating the task will close all related channels.
Examples
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
Low-level synchronization using schedule
and wait
The easiest way is to use it correctly schedule
in a Task' that has not yet been started (scheduled). However `schedule
and wait
can be used as a low-level standard block for building synchronization interfaces. The most important prerequisite for calling schedule(task)
is that the caller must "own" the task
. That is, she must know that the call to wait
in this task
occurs in places known to the code calling `schedule(task)'. One strategy for providing such a precondition is to use atomic operations, as shown in the following example:
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Выполнять до тех пор, пока состояние не будет обновлено до OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# Переход OWE_WAITING -> OWE_NOTIFYING означает, что задача ожидания
# уже ждет или собирается вызвать `wait`. Задача уведомления должна пробудить
# задачу ожидания.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Поскольку предполагается, что существует только одна задача уведомления (для простоты),
# мы знаем, что другим возможным случаем является OWE_EMPTY.
# Нам не нужно ничего делать, потому что мы знаем, что задача ожидания еще не
# вызвала `wait(ev::OneWayEvent)`.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# Переход OWE_EMPTY -> OWE_WAITING означает, что задача уведомления гарантированно
# вызовет переход OWE_WAITING -> OWE_NOTIFYING. Задача ожидания должна
# немедленно вызвать`wait()`. В частности, она не должна вызывать никаких функций, которые
# могут выдавать данные в планировщик в этой точке кода.
wait()
else
@assert state == OWE_NOTIFYING
# В противном случае `state` уже должно быть переведено в OWE_NOTIFYING
# задачей уведомления.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# вывод
notifying...
done
'OneWayEvent` allows one task to wait (wait') for a notification (`notify
) from another task. This is a limited communication interface, since wait
can only be used once from a single task (note the non-atomic assignment of `ev.task').
In this example, notify(ev::OneWayEvent)
can call schedule(ev.task)
if and only if on changes state from OWE_WAITING
to OWE_NOTIFYING'. This way we know that the task executing `wait(ev::OneWayEvent)
is now in the ok
branch, and that there can be no other tasks trying to execute schedule(ev.task)
, since their macro is @atomicreplace(ev.state, state => OWE_NOTIFYING)
will fail.