API
Nodes
DispatchNode
Dispatcher.DispatchNode
— Type.A DispatchNode
represents a unit of computation that can be run. A DispatchNode
may depend on other DispatchNode
s, which are returned from the dependencies
function.
Dispatcher.get_label
— Method.get_label(node::DispatchNode) -> String
Returns a node's label. By default, DispatchNode
s do not support labels, so this method will error.
Dispatcher.set_label!
— Method.set_label!(node::DispatchNode, label)
Sets a node's label. By default, DispatchNode
s do not support labels, so this method will error. Actual method implementations should return their second argument.
Dispatcher.has_label
— Method.has_label(node::DispatchNode) -> Bool
Returns true or false as to whether the node has a label (ie: a get_label(::DispatchNode)
method)
Dispatcher.dependencies
— Method.dependencies(node::DispatchNode) -> Tuple{Vararg{DispatchNode}}
Return all dependencies which must be ready before executing this node. Unless given a dependencies
method, a DispatchNode
will be assumed to have no dependencies.
Dispatcher.prepare!
— Method.prepare!(node::DispatchNode)
Execute some action on a node before dispatching nodes via an Executor
. The default method performs no action.
Dispatcher.run!
— Method.run!(node::DispatchNode)
Execute a node's action as part of dispatch. The default method performs no action.
Base.isready
— Method.isready(node::DispatchNode) -> Bool
Determine whether a node has an available result. The default method assumes no synchronization is involved in retrieving that result.
Base.wait
— Method.wait(node::DispatchNode)
Block the current task until a node has a result available.
Base.fetch
— Method.fetch(node::DispatchNode) -> Any
Fetch a node's result if available, blocking until it is available. All subtypes of DispatchNode
should implement this, so the default method throws an error.
Op
Dispatcher.Op
— Type.An Op
is a DispatchNode
which wraps a function which is executed when the Op
is run. The result of that function call is stored in the result
DeferredFuture
. Any DispatchNode
s which appear in the args or kwargs values will be noted as dependencies. This is the most common DispatchNode
.
Dispatcher.Op
— Method.Op(func::Function, args...; kwargs...) -> Op
Construct an Op
which represents the delayed computation of func(args...; kwargs)
. Any DispatchNode
s which appear in the args or kwargs values will be noted as dependencies. The default label of an Op
is the name of func
.
Dispatcher.@op
— Macro.@op func(...)
The @op
macro makes it more convenient to construct Op
nodes. It translates a function call into an Op
call, effectively deferring the computation.
a = @op sort(1:10; rev=true)
is equivalent to
a = Op(sort, 1:10; rev=true)
Dispatcher.get_label
— Method.get_label(op::Op) -> String
Returns the op.label
.
Dispatcher.set_label!
— Method.set_label!(op::Op, label::AbstractString)
Set the op's label. Returns its second argument.
Dispatcher.has_label
— Method.has_label(::Op) -> Bool
Always return true
as an Op
will always have a label.
Dispatcher.dependencies
— Method.dependencies(op::Op) -> Tuple{Verarg{DispatchNode}}
Return all dependencies which must be ready before executing this Op
. This will be all DispatchNode
s in the Op
's function args
and kwargs
.
Dispatcher.prepare!
— Method.prepare!(op::Op)
Replace an Op
's result field with a fresh, empty one.
Dispatcher.run!
— Method.run!(op::Op)
Fetch an Op
's dependencies and execute its function. Store the result in its result::DeferredFuture
field.
Base.isready
— Method.isready(op::Op) -> Bool
Determine whether an Op
has an available result.
Base.wait
— Method.wait(op::Op)
Wait until an Op
has an available result.
Base.fetch
— Method.fetch(op::Op) -> Any
Return the result of the Op
. Block until it is available. Throw DependencyError
in the event that the result is a DependencyError
.
Base.summary
— Method.summary(op::Op)
Returns a string representation of the Op
with its label and the args/kwargs types.
NOTE: if an arg/kwarg is a DispatchNode
with a label it will be printed with that arg.
DataNode
Dispatcher.DataNode
— Type.A DataNode
is a DispatchNode
which wraps a piece of static data.
Base.fetch
— Method.fetch{T}(node::DataNode{T}) -> T
Immediately return the data contained in a DataNode
.
IndexNode
Dispatcher.IndexNode
— Type.An IndexNode
refers to an element of the return value of a DispatchNode
. It is meant to handle multiple return values from a DispatchNode
.
Example:
n1, n2 = Op(() -> divrem(5, 2))
run!(exec, [n1, n2])
@assert fetch(n1) == 2
@assert fetch(n2) == 1
In this example, n1
and n2
are created as IndexNode
s pointing to the Op
at index 1
and index 2
respectively.
Dispatcher.IndexNode
— Method.IndexNode(node::DispatchNode, index) -> IndexNode
Create a new IndexNode
referring to the result of node
at index
.
Dispatcher.dependencies
— Method.dependencies(node::IndexNode) -> Tuple{DispatchNode}
Return the dependency that this node will fetch data (at a certain index) from.
Dispatcher.prepare!
— Method.prepare!(node::IndexNode)
Replace an IndexNode
's result field with a fresh, empty one.
Dispatcher.run!
— Method.run!(node::IndexNode) -> DeferredFuture
Fetch data from the IndexNode
's parent at the IndexNode
's index, performing the indexing operation on the process where the data lives. Store the data from that index in a DeferredFuture
in the IndexNode
.
Dispatcher.run!
— Method.run!(node::IndexNode) -> DeferredFuture
Fetch data from the IndexNode
's parent at the IndexNode
's index, performing the indexing operation on the process where the data lives. Store the data from that index in a DeferredFuture
in the IndexNode
.
Base.isready
— Method.isready(node::IndexNode) -> Bool
Determine whether an IndexNode
has an available result.
Base.wait
— Method.wait(node::IndexNode)
Wait until an IndexNode
has an available result.
Base.fetch
— Method.fetch(node::IndexNode) -> Any
Return the stored result of indexing.
Base.summary
— Method.summary(node::IndexNode)
Returns a string representation of the IndexNode
with a summary of the wrapped node and the node index.
CollectNode
Dispatcher.CollectNode
— Type.CollectNode{T<:DispatchNode}(nodes::Vector{T}) -> CollectNode{T}
Create a node which gathers an array of nodes and stores an array of their results in its result field.
CollectNode(nodes) -> CollectNode{DispatchNode}
Create a CollectNode
from any iterable of nodes.
Dispatcher.CollectNode
— Method.CollectNode{T<:DispatchNode}(nodes::Vector{T}) -> CollectNode{T}
Create a node which gathers an array of nodes and stores an array of their results in its result field.
CollectNode(nodes) -> CollectNode{DispatchNode}
Create a CollectNode
from any iterable of nodes.
Dispatcher.get_label
— Method.get_label(node::CollectNode) -> String
Returns the node.label.
Dispatcher.set_label!
— Method.set_label!(node::CollectNode, label::AbstractString) -> AbstractString
Set the node's label. Returns its second argument.
Dispatcher.has_label
— Method.has_label(::CollectNode) -> Bool
Always return true
as a CollectNode
will always have a label.
Dispatcher.dependencies
— Method.dependencies{T<:DispatchNode}(node::CollectNode{T}) -> Vector{T}
Return the nodes this depends on which this node will collect.
Dispatcher.prepare!
— Method.prepare!(node::CollectNode)
Initialize a CollectNode
with a fresh result DeferredFuture
.
Dispatcher.run!
— Method.run!(node::CollectNode)
Collect all of a CollectNode
's dependencies' results into a Vector and store that in this node's result field. Returns nothing
.
Base.isready
— Method.isready(node::CollectNode) -> Bool
Determine whether a CollectNode
has an available result.
Base.wait
— Method.wait(node::CollectNode)
Block until a CollectNode
has an available result.
Base.fetch
— Method.fetch(node::CollectNode) -> Vector
Return the result of the collection. Block until it is available.
Base.summary
— Method.summary(node::CollectNode)
Returns a string representation of the CollectNode
with its label.
Graph
DispatchGraph
Dispatcher.DispatchGraph
— Type.DispatchGraph
wraps a directed graph from LightGraphs
and a bidirectional dictionary mapping between DispatchNode
instances and vertex numbers in the graph.
Dispatcher.nodes
— Method.nodes(graph::DispatchGraph) ->
Return an iterable of all nodes stored in the DispatchGraph
.
Base.length
— Method.length(graph::DispatchGraph) -> Integer
Return the number of nodes in the graph.
Base.push!
— Method.push!(graph::DispatchGraph, node::DispatchNode) -> DispatchGraph
Add a node to the graph and return the graph.
LightGraphs.add_edge!
— Method.add_edge!(graph::DispatchGraph, parent::DispatchNode, child::DispatchNode) -> Bool
Add an edge to the graph from parent
to child
. Return whether the operation was successful.
Base.:==
— Method.graph1::DispatchGraph == graph2::DispatchGraph
Determine whether two graphs have the same nodes and edges. This is an expensive operation.
Executors
Executor
Dispatcher.Executor
— Type.An Executor
handles execution of DispatchGraph
s.
A type T <: Executor
must implement dispatch!(::T, ::DispatchNode)
and may optionally implement dispatch!(::T, ::DispatchGraph; throw_error=true)
.
The function call tree will look like this when an executor is run:
run!(exec, context)
prepare!(exec, context)
prepare!(nodes[i])
dispatch!(exec, context)
dispatch!(exec, nodes[i])
run!(nodes[i])
NOTE: Currently, it is expected that dispatch!(::T, ::DispatchNode)
returns something to wait on (ie: Task
, Future
, Channel
, DispatchNode
, etc)
Dispatcher.run!
— Method.run!(exec, output_nodes, input_nodes; input_map, throw_error) -> DispatchResult
Create a graph, ending in output_nodes
, and using input_nodes
/input_map
to replace nodes with fixed values (and ignoring nodes for which all paths descend to input_nodes
), then execute it.
Arguments
exec::Executor
: the executor which will execute the graphgraph::DispatchGraph
: the graph which will be executedoutput_nodes::AbstractArray{T<:DispatchNode}
: the nodes whose results we are interested ininput_nodes::AbstractArray{T<:DispatchNode}
: "root" nodes of the graph which will be replaced with their fetched values (dependencies of these nodes are not included in the graph)
Keyword Arguments
input_map::Associative=Dict{DispatchNode, Any}()
: dict keys are "root" nodes of the subgraph which will be replaced with the dict values (dependencies of these nodes are not included in the graph)throw_error::Bool
: whether to throw anyDependencyError
s immediately (seedispatch!(::Executor, ::DispatchGraph)
for more information)
Returns
Vector{DispatchResult}
: an array containing aDispatchResult
for each node inoutput_nodes
, in that order.
Throws
ExecutorError
: if the constructed graph contains a cycleCompositeException
/DependencyError
: see documentation fordispatch!(::Executor, ::DispatchGraph)
Dispatcher.run!
— Method.run!(exec::Executor, graph::DispatchGraph; kwargs...)
The run!
function prepares a DispatchGraph
for dispatch and then dispatches run!(::DispatchNode)
calls for all nodes in its graph.
Users will almost never want to add methods to this function for different Executor
subtypes; overriding dispatch!(::Executor, ::DispatchGraph)
is the preferred pattern.
Return an array containing a Result{DispatchNode, DependencyError}
for each leaf node.
Dispatcher.prepare!
— Method.prepare!(exec::Executor, graph::DispatchGraph)
This function prepares a context for execution. Call prepare!(::DispatchNode)
on each node.
Dispatcher.dispatch!
— Method.dispatch!(exec::Executor, graph::DispatchGraph; throw_error=true) -> Vector
The default dispatch!
method uses asyncmap
over all nodes in the context to call dispatch!(exec, node)
. These dispatch!
calls for each node are wrapped in various retry and error handling methods.
Wrapping Details
All nodes are wrapped in a try catch which waits on the value returned from the
dispatch!(exec, node)
call. Any errors are caught and used to createDependencyError
s which are thrown. If no errors are produced then the node is returned.NOTE: All errors thrown by trying to run
dispatch!(exec, node)
are wrapped in aDependencyError
.The aformentioned wrapper function is used in a retry wrapper to rerun failed nodes (up to some limit). The wrapped function will only be retried iff the error produced by
dispatch!(::Executor, ::DispatchNode
) passes one of the retry functions specific to thatExecutor
. By default theAsyncExecutor
has noretry_on
functions and theParallelExecutor
only hasretry_on
functions related to the loss of a worker process during execution.A node may enter a failed state if it exits the retry wrapper with an exception. This may occur if an exception is thrown while executing a node and it does not pass any of the
retry_on
conditions for theExecutor
or too many attempts to run the node have been made. In the situation where a node has entered a failed state and the node is anOp
then theop.result
is set to theDependencyError
, signifying the node's failure to any dependent nodes. Finally, ifthrow_error
is true then theDependencyError
will be immediately thrown in the current process without allowing other nodes to finish. Ifthrow_error
is false then theDependencyError
is not thrown and it will be returned in the array of passing and failing nodes.
Arguments
exec::Executor
: the executor we're runninggraph::DispatchGraph
: the context of nodes to run
Keyword Arguments
throw_error::Bool=true
: whether or not to throw theDependencyError
for failed nodes
Returns
Vector{Union{DispatchNode, DependencyError}}
: a list ofDispatchNode
s orDependencyError
s for failed nodes
Throws
dispatch!
has the same behaviour on exceptions asasyncmap
andpmap
. In 0.5 this will throw aCompositeException
containingDependencyError
s, while in 0.6 this will simply throw the firstDependencyError
.
Usage
Example 1
Assuming we have some uncaught application error:
exec = AsyncExecutor()
n1 = Op(() -> 3)
n2 = Op(() -> 4)
failing_node = Op(() -> throw(ErrorException("ApplicationError")))
dep_node = Op(n -> println(n), failing_node) # This node will fail as well
graph = DispatchGraph([n1, n2, failing_node, dep_node])
Then dispatch!(exec, graph)
will throw a DependencyError
and dispatch!(exec, graph; throw_error=false)
will return an array of passing nodes and the DependencyError
s (ie: [n1, n2, DependencyError(...), DependencyError(...)]
).
Example 2
Now if we want to retry our node on certain errors we can do:
exec = AsyncExecutor(5, [e -> isa(e, HttpError) && e.status == "503"])
n1 = Op(() -> 3)
n2 = Op(() -> 4)
http_node = Op(() -> http_get(...))
graph = DispatchGraph([n1, n2, http_node])
Assuming that the http_get
function does not error 5 times the call to dispatch!(exec, graph)
will return [n1, n2, http_node]. If the http_get
function either:
fails with a different status code
fails with something other than an
HttpError
orthrows an
HttpError
with status "503" more than 5 times
then we'll see the same failure behaviour as in the previous example.
Dispatcher.run_inner_node!
— Method.run_inner_node!(exec::Executor, node::DispatchNode, id::Int)
Run the DispatchNode
in the DispatchGraph
at position id
. Any error thrown during the node's execution is caught and wrapped in a DependencyError
.
Typical Executor
implementations should not need to override this.
Dispatcher.retries
— Method.retries(exec::Executor) -> Int
Return the number of retries an executor should perform while attempting to run a node before giving up. The default retries
method returns 0
.
Dispatcher.retry_on
— Method.retry_on(exec::Executor) -> Vector{Function}
Return the vector of predicates which accept an Exception
and return true
if a node can and should be retried (and false
otherwise). The default retry_on
method returns Function[]
.
AsyncExecutor
Dispatcher.AsyncExecutor
— Type.AsyncExecutor
is an Executor
which schedules a local Julia Task
for each DispatchNode
and waits for them to complete. AsyncExecutor
's dispatch!(::AsyncExecutor, ::DispatchNode)
method will complete as long as each DispatchNode
's run!(::DispatchNode)
method completes and there are no cycles in the computation graph.
Dispatcher.AsyncExecutor
— Method.AsyncExecutor(retries=5, retry_on::Vector{Function}=Function[]) -> AsyncExecutor
retries
is the number of times the executor is to retry a failed node. retry_on
is a vector of predicates which accept an Exception
and return true
if a node can and should be retried (and false
otherwise).
Return a new AsyncExecutor
.
Dispatcher.dispatch!
— Method.dispatch!(exec::AsyncExecutor, node::DispatchNode) -> Task
dispatch!
takes the AsyncExecutor
and a DispatchNode
to run. The run!(::DispatchNode)
method on the node is called within a @async
block and the resulting Task
is returned. This is the defining method of AsyncExecutor
.
Dispatcher.retries
— Method.retries(exec::Executor) -> Int
Return the number of retries an executor should perform while attempting to run a node before giving up. The default retries
method returns 0
.
retries(exec::Union{AsyncExecutor, ParallelExecutor}) -> Int
Return the number of retries per node.
Dispatcher.retry_on
— Method.retry_on(exec::Executor) -> Vector{Function}
Return the vector of predicates which accept an Exception
and return true
if a node can and should be retried (and false
otherwise). The default retry_on
method returns Function[]
.
retry_on(exec::Union{AsyncExecutor, ParallelExecutor}) -> Vector{Function}
Return the array of retry conditions.
ParallelExecutor
Dispatcher.ParallelExecutor
— Type.ParallelExecutor
is an Executor
which creates a Julia Task
for each DispatchNode
, spawns each of those tasks on the processes available to Julia, and waits for them to complete. ParallelExecutor
's dispatch!(::ParallelExecutor, ::DispatchGraph)
method will complete as long as each DispatchNode
's run!(::DispatchNode)
method completes and there are no cycles in the computation graph.
ParallelExecutor(retries=5, retry_on::Vector{Function}=Function[]) -> ParallelExecutor
retries
is the number of times the executor is to retry a failed node. retry_on
is a vector of predicates which accept an Exception
and return true
if a node can and should be retried (and false
otherwise). Returns a new ParallelExecutor
.
Dispatcher.dispatch!
— Method.dispatch!(exec::ParallelExecutor, node::DispatchNode) -> Future
dispatch!
takes the ParallelExecutor
and a DispatchNode
to run. The run!(::DispatchNode)
method on the node is called within an @spawn
block and the resulting Future
is returned. This is the defining method of ParallelExecutor
.
Dispatcher.retries
— Method.retries(exec::Executor) -> Int
Return the number of retries an executor should perform while attempting to run a node before giving up. The default retries
method returns 0
.
retries(exec::Union{AsyncExecutor, ParallelExecutor}) -> Int
Return the number of retries per node.
Dispatcher.retry_on
— Method.retry_on(exec::Executor) -> Vector{Function}
Return the vector of predicates which accept an Exception
and return true
if a node can and should be retried (and false
otherwise). The default retry_on
method returns Function[]
.
retry_on(exec::Union{AsyncExecutor, ParallelExecutor}) -> Vector{Function}
Return the array of retry conditions.
Errors
DependencyError
Dispatcher.DependencyError
— Type.DependencyError
wraps any errors (and corresponding traceback) that occur on the dependency of a given nodes.
This is important for passing failure conditions to dependent nodes after a failed number of retries.
NOTE: our trace
field is a Union of Vector{Any}
and StackTrace
because we could be storing the traceback from a CompositeException
(inside a RemoteException
) which is of type Vector{Any}
Base.summary
— Method.summary(de::DependencyError)
Retuns a string representation of the error with only the internal Exception
type and the id