API
Nodes
DispatchNode
Dispatcher.DispatchNode — Type.A DispatchNode represents a unit of computation that can be run. A DispatchNode may depend on other DispatchNodes, which are returned from the dependencies function.
Dispatcher.get_label — Method.get_label(node::DispatchNode) -> StringReturns a node's label. By default, DispatchNodes do not support labels, so this method will error.
Dispatcher.set_label! — Method.set_label!(node::DispatchNode, label)Sets a node's label. By default, DispatchNodes do not support labels, so this method will error. Actual method implementations should return their second argument.
Dispatcher.has_label — Method.has_label(node::DispatchNode) -> BoolReturns true or false as to whether the node has a label (ie: a get_label(::DispatchNode) method)
Dispatcher.dependencies — Method.dependencies(node::DispatchNode) -> Tuple{Vararg{DispatchNode}}Return all dependencies which must be ready before executing this node. Unless given a dependencies method, a DispatchNode will be assumed to have no dependencies.
Dispatcher.prepare! — Method.prepare!(node::DispatchNode)Execute some action on a node before dispatching nodes via an Executor. The default method performs no action.
Dispatcher.run! — Method.run!(node::DispatchNode)Execute a node's action as part of dispatch. The default method performs no action.
Base.isready — Method.isready(node::DispatchNode) -> BoolDetermine whether a node has an available result. The default method assumes no synchronization is involved in retrieving that result.
Base.wait — Method.wait(node::DispatchNode)Block the current task until a node has a result available.
Base.fetch — Method.fetch(node::DispatchNode) -> AnyFetch a node's result if available, blocking until it is available. All subtypes of DispatchNode should implement this, so the default method throws an error.
Op
Dispatcher.Op — Type.An Op is a DispatchNode which wraps a function which is executed when the Op is run. The result of that function call is stored in the result DeferredFuture. Any DispatchNodes which appear in the args or kwargs values will be noted as dependencies. This is the most common DispatchNode.
Dispatcher.Op — Method.Op(func::Function, args...; kwargs...) -> OpConstruct an Op which represents the delayed computation of func(args...; kwargs). Any DispatchNodes which appear in the args or kwargs values will be noted as dependencies. The default label of an Op is the name of func.
Dispatcher.@op — Macro.@op func(...)The @op macro makes it more convenient to construct Op nodes. It translates a function call into an Op call, effectively deferring the computation.
a = @op sort(1:10; rev=true)is equivalent to
a = Op(sort, 1:10; rev=true)Dispatcher.get_label — Method.get_label(op::Op) -> StringReturns the op.label.
Dispatcher.set_label! — Method.set_label!(op::Op, label::AbstractString)Set the op's label. Returns its second argument.
Dispatcher.has_label — Method.has_label(::Op) -> BoolAlways return true as an Op will always have a label.
Dispatcher.dependencies — Method.dependencies(op::Op) -> Tuple{Verarg{DispatchNode}}Return all dependencies which must be ready before executing this Op. This will be all DispatchNodes in the Op's function args and kwargs.
Dispatcher.prepare! — Method.prepare!(op::Op)Replace an Op's result field with a fresh, empty one.
Dispatcher.run! — Method.run!(op::Op)Fetch an Op's dependencies and execute its function. Store the result in its result::DeferredFuture field.
Base.isready — Method.isready(op::Op) -> BoolDetermine whether an Op has an available result.
Base.wait — Method.wait(op::Op)Wait until an Op has an available result.
Base.fetch — Method.fetch(op::Op) -> AnyReturn the result of the Op. Block until it is available. Throw DependencyError in the event that the result is a DependencyError.
Base.summary — Method.summary(op::Op)Returns a string representation of the Op with its label and the args/kwargs types.
NOTE: if an arg/kwarg is a DispatchNode with a label it will be printed with that arg.
DataNode
Dispatcher.DataNode — Type.A DataNode is a DispatchNode which wraps a piece of static data.
Base.fetch — Method.fetch{T}(node::DataNode{T}) -> TImmediately return the data contained in a DataNode.
IndexNode
Dispatcher.IndexNode — Type.An IndexNode refers to an element of the return value of a DispatchNode. It is meant to handle multiple return values from a DispatchNode.
Example:
n1, n2 = Op(() -> divrem(5, 2))
run!(exec, [n1, n2])
@assert fetch(n1) == 2
@assert fetch(n2) == 1In this example, n1 and n2 are created as IndexNodes pointing to the Op at index 1 and index 2 respectively.
Dispatcher.IndexNode — Method.IndexNode(node::DispatchNode, index) -> IndexNodeCreate a new IndexNode referring to the result of node at index.
Dispatcher.dependencies — Method.dependencies(node::IndexNode) -> Tuple{DispatchNode}Return the dependency that this node will fetch data (at a certain index) from.
Dispatcher.prepare! — Method.prepare!(node::IndexNode)Replace an IndexNode's result field with a fresh, empty one.
Dispatcher.run! — Method.run!(node::IndexNode) -> DeferredFutureFetch data from the IndexNode's parent at the IndexNode's index, performing the indexing operation on the process where the data lives. Store the data from that index in a DeferredFuture in the IndexNode.
Dispatcher.run! — Method.run!(node::IndexNode) -> DeferredFutureFetch data from the IndexNode's parent at the IndexNode's index, performing the indexing operation on the process where the data lives. Store the data from that index in a DeferredFuture in the IndexNode.
Base.isready — Method.isready(node::IndexNode) -> BoolDetermine whether an IndexNode has an available result.
Base.wait — Method.wait(node::IndexNode)Wait until an IndexNode has an available result.
Base.fetch — Method.fetch(node::IndexNode) -> AnyReturn the stored result of indexing.
Base.summary — Method.summary(node::IndexNode)Returns a string representation of the IndexNode with a summary of the wrapped node and the node index.
CollectNode
Dispatcher.CollectNode — Type.CollectNode{T<:DispatchNode}(nodes::Vector{T}) -> CollectNode{T}Create a node which gathers an array of nodes and stores an array of their results in its result field.
CollectNode(nodes) -> CollectNode{DispatchNode}Create a CollectNode from any iterable of nodes.
Dispatcher.CollectNode — Method.CollectNode{T<:DispatchNode}(nodes::Vector{T}) -> CollectNode{T}Create a node which gathers an array of nodes and stores an array of their results in its result field.
CollectNode(nodes) -> CollectNode{DispatchNode}Create a CollectNode from any iterable of nodes.
Dispatcher.get_label — Method.get_label(node::CollectNode) -> StringReturns the node.label.
Dispatcher.set_label! — Method.set_label!(node::CollectNode, label::AbstractString) -> AbstractStringSet the node's label. Returns its second argument.
Dispatcher.has_label — Method.has_label(::CollectNode) -> BoolAlways return true as a CollectNode will always have a label.
Dispatcher.dependencies — Method.dependencies{T<:DispatchNode}(node::CollectNode{T}) -> Vector{T}Return the nodes this depends on which this node will collect.
Dispatcher.prepare! — Method.prepare!(node::CollectNode)Initialize a CollectNode with a fresh result DeferredFuture.
Dispatcher.run! — Method.run!(node::CollectNode)Collect all of a CollectNode's dependencies' results into a Vector and store that in this node's result field. Returns nothing.
Base.isready — Method.isready(node::CollectNode) -> BoolDetermine whether a CollectNode has an available result.
Base.wait — Method.wait(node::CollectNode)Block until a CollectNode has an available result.
Base.fetch — Method.fetch(node::CollectNode) -> VectorReturn the result of the collection. Block until it is available.
Base.summary — Method.summary(node::CollectNode)Returns a string representation of the CollectNode with its label.
Graph
DispatchGraph
Dispatcher.DispatchGraph — Type.DispatchGraph wraps a directed graph from LightGraphs and a bidirectional dictionary mapping between DispatchNode instances and vertex numbers in the graph.
Dispatcher.nodes — Method.nodes(graph::DispatchGraph) ->Return an iterable of all nodes stored in the DispatchGraph.
Base.length — Method.length(graph::DispatchGraph) -> IntegerReturn the number of nodes in the graph.
Base.push! — Method.push!(graph::DispatchGraph, node::DispatchNode) -> DispatchGraphAdd a node to the graph and return the graph.
LightGraphs.SimpleGraphs.add_edge! — Method.add_edge!(graph::DispatchGraph, parent::DispatchNode, child::DispatchNode) -> BoolAdd an edge to the graph from parent to child. Return whether the operation was successful.
Base.:== — Method.graph1::DispatchGraph == graph2::DispatchGraphDetermine whether two graphs have the same nodes and edges. This is an expensive operation.
Executors
Executor
Dispatcher.Executor — Type.An Executor handles execution of DispatchGraphs.
A type T <: Executor must implement dispatch!(::T, ::DispatchNode) and may optionally implement dispatch!(::T, ::DispatchGraph; throw_error=true).
The function call tree will look like this when an executor is run:
run!(exec, context)
    prepare!(exec, context)
        prepare!(nodes[i])
    dispatch!(exec, context)
        dispatch!(exec, nodes[i])
            run!(nodes[i])NOTE: Currently, it is expected that dispatch!(::T, ::DispatchNode) returns something to wait on (ie: Task, Future, Channel, DispatchNode, etc)
Dispatcher.run! — Method.run!(exec, output_nodes, input_nodes; input_map, throw_error) -> DispatchResultCreate a graph, ending in output_nodes, and using input_nodes/input_map to replace nodes with fixed values (and ignoring nodes for which all paths descend to input_nodes), then execute it.
Arguments
- exec::Executor: the executor which will execute the graph
- graph::DispatchGraph: the graph which will be executed
- output_nodes::AbstractArray{T<:DispatchNode}: the nodes whose results we are interested in
- input_nodes::AbstractArray{T<:DispatchNode}: "root" nodes of the graph which will be replaced with their fetched values (dependencies of these nodes are not included in the graph)
Keyword Arguments
- input_map::Associative=Dict{DispatchNode, Any}(): dict keys are "root" nodes of the subgraph which will be replaced with the dict values (dependencies of these nodes are not included in the graph)
- throw_error::Bool: whether to throw any- DependencyErrors immediately (see- dispatch!(::Executor, ::DispatchGraph)for more information)
Returns
- Vector{DispatchResult}: an array containing a- DispatchResultfor each node in- output_nodes, in that order.
Throws
- ExecutorError: if the constructed graph contains a cycle
- CompositeException/- DependencyError: see documentation for- dispatch!(::Executor, ::DispatchGraph)
Dispatcher.run! — Method.run!(exec::Executor, graph::DispatchGraph; kwargs...)The run! function prepares a DispatchGraph for dispatch and then dispatches run!(::DispatchNode) calls for all nodes in its graph.
Users will almost never want to add methods to this function for different Executor subtypes; overriding dispatch!(::Executor, ::DispatchGraph) is the preferred pattern.
Return an array containing a Result{DispatchNode, DependencyError} for each leaf node.
Dispatcher.prepare! — Method.prepare!(exec::Executor, graph::DispatchGraph)This function prepares a context for execution. Call prepare!(::DispatchNode) on each node.
Dispatcher.dispatch! — Method.dispatch!(exec::Executor, graph::DispatchGraph; throw_error=true) -> VectorThe default dispatch! method uses asyncmap over all nodes in the context to call dispatch!(exec, node). These dispatch! calls for each node are wrapped in various retry and error handling methods.
Wrapping Details
- All nodes are wrapped in a try catch which waits on the value returned from the - dispatch!(exec, node)call. Any errors are caught and used to create- DependencyErrors which are thrown. If no errors are produced then the node is returned.- NOTE: All errors thrown by trying to run - dispatch!(exec, node)are wrapped in a- DependencyError.
- The aformentioned wrapper function is used in a retry wrapper to rerun failed nodes (up to some limit). The wrapped function will only be retried iff the error produced by - dispatch!(::Executor, ::DispatchNode) passes one of the retry functions specific to that- Executor. By default the- AsyncExecutorhas no- retry_onfunctions and the- ParallelExecutoronly has- retry_onfunctions related to the loss of a worker process during execution.
- A node may enter a failed state if it exits the retry wrapper with an exception. This may occur if an exception is thrown while executing a node and it does not pass any of the - retry_onconditions for the- Executoror too many attempts to run the node have been made. In the situation where a node has entered a failed state and the node is an- Opthen the- op.resultis set to the- DependencyError, signifying the node's failure to any dependent nodes. Finally, if- throw_erroris true then the- DependencyErrorwill be immediately thrown in the current process without allowing other nodes to finish. If- throw_erroris false then the- DependencyErroris not thrown and it will be returned in the array of passing and failing nodes.
Arguments
- exec::Executor: the executor we're running
- graph::DispatchGraph: the context of nodes to run
Keyword Arguments
- throw_error::Bool=true: whether or not to throw the- DependencyErrorfor failed nodes
Returns
- Vector{Union{DispatchNode, DependencyError}}: a list of- DispatchNodes or- DependencyErrors for failed nodes
Throws
- dispatch!has the same behaviour on exceptions as- asyncmapand- pmap. In 0.5 this will throw a- CompositeExceptioncontaining- DependencyErrors, while in 0.6 this will simply throw the first- DependencyError.
Usage
Example 1
Assuming we have some uncaught application error:
exec = AsyncExecutor()
n1 = Op(() -> 3)
n2 = Op(() -> 4)
failing_node = Op(() -> throw(ErrorException("ApplicationError")))
dep_node = Op(n -> println(n), failing_node)  # This node will fail as well
graph = DispatchGraph([n1, n2, failing_node, dep_node])Then dispatch!(exec, graph) will throw a DependencyError and dispatch!(exec, graph; throw_error=false) will return an array of passing nodes and the DependencyErrors (ie: [n1, n2, DependencyError(...), DependencyError(...)]).
Example 2
Now if we want to retry our node on certain errors we can do:
exec = AsyncExecutor(5, [e -> isa(e, HttpError) && e.status == "503"])
n1 = Op(() -> 3)
n2 = Op(() -> 4)
http_node = Op(() -> http_get(...))
graph = DispatchGraph([n1, n2, http_node])Assuming that the http_get function does not error 5 times the call to dispatch!(exec, graph) will return [n1, n2, httpnode]. If the `httpget` function either:
- fails with a different status code
- fails with something other than an HttpErroror
- throws an HttpErrorwith status "503" more than 5 times
then we'll see the same failure behaviour as in the previous example.
Dispatcher.run_inner_node! — Method.run_inner_node!(exec::Executor, node::DispatchNode, id::Int)Run the DispatchNode in the DispatchGraph at position id. Any error thrown during the node's execution is caught and wrapped in a DependencyError.
Typical Executor implementations should not need to override this.
Dispatcher.retries — Method.retries(exec::Executor) -> IntReturn the number of retries an executor should perform while attempting to run a node before giving up. The default retries method returns 0.
Dispatcher.retry_on — Method.retry_on(exec::Executor) -> Vector{Function}Return the vector of predicates which accept an Exception and return true if a node can and should be retried (and false otherwise). The default retry_on method returns Function[].
AsyncExecutor
Dispatcher.AsyncExecutor — Type.AsyncExecutor is an Executor which schedules a local Julia Task for each DispatchNode and waits for them to complete. AsyncExecutor's dispatch!(::AsyncExecutor, ::DispatchNode) method will complete as long as each DispatchNode's run!(::DispatchNode) method completes and there are no cycles in the computation graph.
Dispatcher.AsyncExecutor — Method.AsyncExecutor(retries=5, retry_on::Vector{Function}=Function[]) -> AsyncExecutorretries is the number of times the executor is to retry a failed node. retry_on is a vector of predicates which accept an Exception and return true if a node can and should be retried (and false otherwise).
Return a new AsyncExecutor.
Dispatcher.dispatch! — Method.dispatch!(exec::AsyncExecutor, node::DispatchNode) -> Taskdispatch! takes the AsyncExecutor and a DispatchNode to run. The run!(::DispatchNode) method on the node is called within a @async block and the resulting Task is returned. This is the defining method of AsyncExecutor.
Dispatcher.retries — Method.retries(exec::Executor) -> IntReturn the number of retries an executor should perform while attempting to run a node before giving up. The default retries method returns 0.
retries(exec::Union{AsyncExecutor, ParallelExecutor}) -> IntReturn the number of retries per node.
Dispatcher.retry_on — Method.retry_on(exec::Executor) -> Vector{Function}Return the vector of predicates which accept an Exception and return true if a node can and should be retried (and false otherwise). The default retry_on method returns Function[].
retry_on(exec::Union{AsyncExecutor, ParallelExecutor}) -> Vector{Function}Return the array of retry conditions.
ParallelExecutor
Dispatcher.ParallelExecutor — Type.ParallelExecutor is an Executor which creates a Julia Task for each DispatchNode, spawns each of those tasks on the processes available to Julia, and waits for them to complete. ParallelExecutor's dispatch!(::ParallelExecutor, ::DispatchGraph) method will complete as long as each DispatchNode's run!(::DispatchNode) method completes and there are no cycles in the computation graph.
ParallelExecutor(retries=5, retry_on::Vector{Function}=Function[]) -> ParallelExecutorretries is the number of times the executor is to retry a failed node. retry_on is a vector of predicates which accept an Exception and return true if a node can and should be retried (and false otherwise). Returns a new ParallelExecutor.
Dispatcher.dispatch! — Method.dispatch!(exec::ParallelExecutor, node::DispatchNode) -> Futuredispatch! takes the ParallelExecutor and a DispatchNode to run. The run!(::DispatchNode) method on the node is called within an @spawn block and the resulting Future is returned. This is the defining method of ParallelExecutor.
Dispatcher.retries — Method.retries(exec::Executor) -> IntReturn the number of retries an executor should perform while attempting to run a node before giving up. The default retries method returns 0.
retries(exec::Union{AsyncExecutor, ParallelExecutor}) -> IntReturn the number of retries per node.
Dispatcher.retry_on — Method.retry_on(exec::Executor) -> Vector{Function}Return the vector of predicates which accept an Exception and return true if a node can and should be retried (and false otherwise). The default retry_on method returns Function[].
retry_on(exec::Union{AsyncExecutor, ParallelExecutor}) -> Vector{Function}Return the array of retry conditions.
Errors
DependencyError
Dispatcher.DependencyError — Type.DependencyError wraps any errors (and corresponding traceback) that occur on the dependency of a given nodes.
This is important for passing failure conditions to dependent nodes after a failed number of retries.
NOTE: our trace field is a Union of Vector{Any} and StackTrace because we could be storing the traceback from a CompositeException (inside a RemoteException) which is of type Vector{Any}
Base.summary — Method.summary(de::DependencyError)Retuns a string representation of the error with only the internal Exception type and the id