API

API

Nodes

DispatchNode

A DispatchNode represents a unit of computation that can be run. A DispatchNode may depend on other DispatchNodes, which are returned from the dependencies function.

source
get_label(node::DispatchNode) -> String

Returns a node's label. By default, DispatchNodes do not support labels, so this method will error.

source
set_label!(node::DispatchNode, label)

Sets a node's label. By default, DispatchNodes do not support labels, so this method will error. Actual method implementations should return their second argument.

source
has_label(node::DispatchNode) -> Bool

Returns true or false as to whether the node has a label (ie: a get_label(::DispatchNode) method)

source
dependencies(node::DispatchNode) -> Tuple{Vararg{DispatchNode}}

Return all dependencies which must be ready before executing this node. Unless given a dependencies method, a DispatchNode will be assumed to have no dependencies.

source
prepare!(node::DispatchNode)

Execute some action on a node before dispatching nodes via an Executor. The default method performs no action.

source
Dispatcher.run!Method.
run!(node::DispatchNode)

Execute a node's action as part of dispatch. The default method performs no action.

source
Base.isreadyMethod.
isready(node::DispatchNode) -> Bool

Determine whether a node has an available result. The default method assumes no synchronization is involved in retrieving that result.

source
Base.waitMethod.
wait(node::DispatchNode)

Block the current task until a node has a result available.

source
Base.fetchMethod.
fetch(node::DispatchNode) -> Any

Fetch a node's result if available, blocking until it is available. All subtypes of DispatchNode should implement this, so the default method throws an error.

source

Op

Dispatcher.OpType.

An Op is a DispatchNode which wraps a function which is executed when the Op is run. The result of that function call is stored in the result DeferredFuture. Any DispatchNodes which appear in the args or kwargs values will be noted as dependencies. This is the most common DispatchNode.

source
Dispatcher.OpMethod.
Op(func::Function, args...; kwargs...) -> Op

Construct an Op which represents the delayed computation of func(args...; kwargs). Any DispatchNodes which appear in the args or kwargs values will be noted as dependencies. The default label of an Op is the name of func.

source
Dispatcher.@opMacro.
@op func(...)

The @op macro makes it more convenient to construct Op nodes. It translates a function call into an Op call, effectively deferring the computation.

a = @op sort(1:10; rev=true)

is equivalent to

a = Op(sort, 1:10; rev=true)
source
get_label(op::Op) -> String

Returns the op.label.

source
set_label!(op::Op, label::AbstractString)

Set the op's label. Returns its second argument.

source
has_label(::Op) -> Bool

Always return true as an Op will always have a label.

source
dependencies(op::Op) -> Tuple{Verarg{DispatchNode}}

Return all dependencies which must be ready before executing this Op. This will be all DispatchNodes in the Op's function args and kwargs.

source
prepare!(op::Op)

Replace an Op's result field with a fresh, empty one.

source
Dispatcher.run!Method.
run!(op::Op)

Fetch an Op's dependencies and execute its function. Store the result in its result::DeferredFuture field.

source
Base.isreadyMethod.
isready(op::Op) -> Bool

Determine whether an Op has an available result.

source
Base.waitMethod.
wait(op::Op)

Wait until an Op has an available result.

source
Base.fetchMethod.
fetch(op::Op) -> Any

Return the result of the Op. Block until it is available. Throw DependencyError in the event that the result is a DependencyError.

source
Base.summaryMethod.
summary(op::Op)

Returns a string representation of the Op with its label and the args/kwargs types.

NOTE: if an arg/kwarg is a DispatchNode with a label it will be printed with that arg.

source

DataNode

A DataNode is a DispatchNode which wraps a piece of static data.

source
Base.fetchMethod.
fetch{T}(node::DataNode{T}) -> T

Immediately return the data contained in a DataNode.

source

IndexNode

An IndexNode refers to an element of the return value of a DispatchNode. It is meant to handle multiple return values from a DispatchNode.

Example:

n1, n2 = Op(() -> divrem(5, 2))
run!(exec, [n1, n2])

@assert fetch(n1) == 2
@assert fetch(n2) == 1

In this example, n1 and n2 are created as IndexNodes pointing to the Op at index 1 and index 2 respectively.

source
IndexNode(node::DispatchNode, index) -> IndexNode

Create a new IndexNode referring to the result of node at index.

source
dependencies(node::IndexNode) -> Tuple{DispatchNode}

Return the dependency that this node will fetch data (at a certain index) from.

source
prepare!(node::IndexNode)

Replace an IndexNode's result field with a fresh, empty one.

source
Dispatcher.run!Method.
run!(node::IndexNode) -> DeferredFuture

Fetch data from the IndexNode's parent at the IndexNode's index, performing the indexing operation on the process where the data lives. Store the data from that index in a DeferredFuture in the IndexNode.

source
Dispatcher.run!Method.
run!(node::IndexNode) -> DeferredFuture

Fetch data from the IndexNode's parent at the IndexNode's index, performing the indexing operation on the process where the data lives. Store the data from that index in a DeferredFuture in the IndexNode.

source
Base.isreadyMethod.
isready(node::IndexNode) -> Bool

Determine whether an IndexNode has an available result.

source
Base.waitMethod.
wait(node::IndexNode)

Wait until an IndexNode has an available result.

source
Base.fetchMethod.
fetch(node::IndexNode) -> Any

Return the stored result of indexing.

source
Base.summaryMethod.
summary(node::IndexNode)

Returns a string representation of the IndexNode with a summary of the wrapped node and the node index.

source

CollectNode

CollectNode{T<:DispatchNode}(nodes::Vector{T}) -> CollectNode{T}

Create a node which gathers an array of nodes and stores an array of their results in its result field.

source
CollectNode(nodes) -> CollectNode{DispatchNode}

Create a CollectNode from any iterable of nodes.

source
CollectNode{T<:DispatchNode}(nodes::Vector{T}) -> CollectNode{T}

Create a node which gathers an array of nodes and stores an array of their results in its result field.

source
CollectNode(nodes) -> CollectNode{DispatchNode}

Create a CollectNode from any iterable of nodes.

source
get_label(node::CollectNode) -> String

Returns the node.label.

source
set_label!(node::CollectNode, label::AbstractString) -> AbstractString

Set the node's label. Returns its second argument.

source
has_label(::CollectNode) -> Bool

Always return true as a CollectNode will always have a label.

source
dependencies{T<:DispatchNode}(node::CollectNode{T}) -> Vector{T}

Return the nodes this depends on which this node will collect.

source
prepare!(node::CollectNode)

Initialize a CollectNode with a fresh result DeferredFuture.

source
Dispatcher.run!Method.
run!(node::CollectNode)

Collect all of a CollectNode's dependencies' results into a Vector and store that in this node's result field. Returns nothing.

source
Base.isreadyMethod.
isready(node::CollectNode) -> Bool

Determine whether a CollectNode has an available result.

source
Base.waitMethod.
wait(node::CollectNode)

Block until a CollectNode has an available result.

source
Base.fetchMethod.
fetch(node::CollectNode) -> Vector

Return the result of the collection. Block until it is available.

source
Base.summaryMethod.
summary(node::CollectNode)

Returns a string representation of the CollectNode with its label.

source

Graph

DispatchGraph

DispatchGraph wraps a directed graph from LightGraphs and a bidirectional dictionary mapping between DispatchNode instances and vertex numbers in the graph.

source
Dispatcher.nodesMethod.
nodes(graph::DispatchGraph) ->

Return an iterable of all nodes stored in the DispatchGraph.

source
Base.lengthMethod.
length(graph::DispatchGraph) -> Integer

Return the number of nodes in the graph.

source
Base.push!Method.
push!(graph::DispatchGraph, node::DispatchNode) -> DispatchGraph

Add a node to the graph and return the graph.

source
add_edge!(graph::DispatchGraph, parent::DispatchNode, child::DispatchNode) -> Bool

Add an edge to the graph from parent to child. Return whether the operation was successful.

source
Base.:==Method.
graph1::DispatchGraph == graph2::DispatchGraph

Determine whether two graphs have the same nodes and edges. This is an expensive operation.

source

Executors

Executor

An Executor handles execution of DispatchGraphs.

A type T <: Executor must implement dispatch!(::T, ::DispatchNode) and may optionally implement dispatch!(::T, ::DispatchGraph; throw_error=true).

The function call tree will look like this when an executor is run:

run!(exec, context)
    prepare!(exec, context)
        prepare!(nodes[i])
    dispatch!(exec, context)
        dispatch!(exec, nodes[i])
            run!(nodes[i])

NOTE: Currently, it is expected that dispatch!(::T, ::DispatchNode) returns something to wait on (ie: Task, Future, Channel, DispatchNode, etc)

source
Dispatcher.run!Method.
run!(exec, output_nodes, input_nodes; input_map, throw_error) -> DispatchResult

Create a graph, ending in output_nodes, and using input_nodes/input_map to replace nodes with fixed values (and ignoring nodes for which all paths descend to input_nodes), then execute it.

Arguments

  • exec::Executor: the executor which will execute the graph

  • graph::DispatchGraph: the graph which will be executed

  • output_nodes::AbstractArray{T<:DispatchNode}: the nodes whose results we are interested in

  • input_nodes::AbstractArray{T<:DispatchNode}: "root" nodes of the graph which will be replaced with their fetched values (dependencies of these nodes are not included in the graph)

Keyword Arguments

  • input_map::Associative=Dict{DispatchNode, Any}(): dict keys are "root" nodes of the subgraph which will be replaced with the dict values (dependencies of these nodes are not included in the graph)

  • throw_error::Bool: whether to throw any DependencyErrors immediately (see dispatch!(::Executor, ::DispatchGraph) for more information)

Returns

  • Vector{DispatchResult}: an array containing a DispatchResult for each node in output_nodes, in that order.

Throws

source
Dispatcher.run!Method.
run!(exec::Executor, graph::DispatchGraph; kwargs...)

The run! function prepares a DispatchGraph for dispatch and then dispatches run!(::DispatchNode) calls for all nodes in its graph.

Users will almost never want to add methods to this function for different Executor subtypes; overriding dispatch!(::Executor, ::DispatchGraph) is the preferred pattern.

Return an array containing a Result{DispatchNode, DependencyError} for each leaf node.

source
prepare!(exec::Executor, graph::DispatchGraph)

This function prepares a context for execution. Call prepare!(::DispatchNode) on each node.

source
dispatch!(exec::Executor, graph::DispatchGraph; throw_error=true) -> Vector

The default dispatch! method uses asyncmap over all nodes in the context to call dispatch!(exec, node). These dispatch! calls for each node are wrapped in various retry and error handling methods.

Wrapping Details

  1. All nodes are wrapped in a try catch which waits on the value returned from the dispatch!(exec, node) call. Any errors are caught and used to create DependencyErrors which are thrown. If no errors are produced then the node is returned.

    NOTE: All errors thrown by trying to run dispatch!(exec, node) are wrapped in a DependencyError.

  2. The aformentioned wrapper function is used in a retry wrapper to rerun failed nodes (up to some limit). The wrapped function will only be retried iff the error produced by dispatch!(::Executor, ::DispatchNode) passes one of the retry functions specific to that Executor. By default the AsyncExecutor has no retry_on functions and the ParallelExecutor only has retry_on functions related to the loss of a worker process during execution.

  3. A node may enter a failed state if it exits the retry wrapper with an exception. This may occur if an exception is thrown while executing a node and it does not pass any of the retry_on conditions for the Executor or too many attempts to run the node have been made. In the situation where a node has entered a failed state and the node is an Op then the op.result is set to the DependencyError, signifying the node's failure to any dependent nodes. Finally, if throw_error is true then the DependencyError will be immediately thrown in the current process without allowing other nodes to finish. If throw_error is false then the DependencyError is not thrown and it will be returned in the array of passing and failing nodes.

Arguments

  • exec::Executor: the executor we're running

  • graph::DispatchGraph: the context of nodes to run

Keyword Arguments

  • throw_error::Bool=true: whether or not to throw the DependencyError for failed nodes

Returns

  • Vector{Union{DispatchNode, DependencyError}}: a list of DispatchNodes or DependencyErrors for failed nodes

Throws

  • dispatch! has the same behaviour on exceptions as asyncmap and pmap. In 0.5 this will throw a CompositeException containing DependencyErrors, while in 0.6 this will simply throw the first DependencyError.

Usage

Example 1

Assuming we have some uncaught application error:

exec = AsyncExecutor()
n1 = Op(() -> 3)
n2 = Op(() -> 4)
failing_node = Op(() -> throw(ErrorException("ApplicationError")))
dep_node = Op(n -> println(n), failing_node)  # This node will fail as well
graph = DispatchGraph([n1, n2, failing_node, dep_node])

Then dispatch!(exec, graph) will throw a DependencyError and dispatch!(exec, graph; throw_error=false) will return an array of passing nodes and the DependencyErrors (ie: [n1, n2, DependencyError(...), DependencyError(...)]).

Example 2

Now if we want to retry our node on certain errors we can do:

exec = AsyncExecutor(5, [e -> isa(e, HttpError) && e.status == "503"])
n1 = Op(() -> 3)
n2 = Op(() -> 4)
http_node = Op(() -> http_get(...))
graph = DispatchGraph([n1, n2, http_node])

Assuming that the http_get function does not error 5 times the call to dispatch!(exec, graph) will return [n1, n2, http_node]. If the http_get function either:

  1. fails with a different status code

  2. fails with something other than an HttpError or

  3. throws an HttpError with status "503" more than 5 times

then we'll see the same failure behaviour as in the previous example.

source
run_inner_node!(exec::Executor, node::DispatchNode, id::Int)

Run the DispatchNode in the DispatchGraph at position id. Any error thrown during the node's execution is caught and wrapped in a DependencyError.

Typical Executor implementations should not need to override this.

source
Dispatcher.retriesMethod.
retries(exec::Executor) -> Int

Return the number of retries an executor should perform while attempting to run a node before giving up. The default retries method returns 0.

source
retry_on(exec::Executor) -> Vector{Function}

Return the vector of predicates which accept an Exception and return true if a node can and should be retried (and false otherwise). The default retry_on method returns Function[].

source

AsyncExecutor

AsyncExecutor is an Executor which schedules a local Julia Task for each DispatchNode and waits for them to complete. AsyncExecutor's dispatch!(::AsyncExecutor, ::DispatchNode) method will complete as long as each DispatchNode's run!(::DispatchNode) method completes and there are no cycles in the computation graph.

source
AsyncExecutor(retries=5, retry_on::Vector{Function}=Function[]) -> AsyncExecutor

retries is the number of times the executor is to retry a failed node. retry_on is a vector of predicates which accept an Exception and return true if a node can and should be retried (and false otherwise).

Return a new AsyncExecutor.

source
dispatch!(exec::AsyncExecutor, node::DispatchNode) -> Task

dispatch! takes the AsyncExecutor and a DispatchNode to run. The run!(::DispatchNode) method on the node is called within a @async block and the resulting Task is returned. This is the defining method of AsyncExecutor.

source
Dispatcher.retriesMethod.
retries(exec::Executor) -> Int

Return the number of retries an executor should perform while attempting to run a node before giving up. The default retries method returns 0.

source
retries(exec::Union{AsyncExecutor, ParallelExecutor}) -> Int

Return the number of retries per node.

source
retry_on(exec::Executor) -> Vector{Function}

Return the vector of predicates which accept an Exception and return true if a node can and should be retried (and false otherwise). The default retry_on method returns Function[].

source
retry_on(exec::Union{AsyncExecutor, ParallelExecutor}) -> Vector{Function}

Return the array of retry conditions.

source

ParallelExecutor

ParallelExecutor is an Executor which creates a Julia Task for each DispatchNode, spawns each of those tasks on the processes available to Julia, and waits for them to complete. ParallelExecutor's dispatch!(::ParallelExecutor, ::DispatchGraph) method will complete as long as each DispatchNode's run!(::DispatchNode) method completes and there are no cycles in the computation graph.

ParallelExecutor(retries=5, retry_on::Vector{Function}=Function[]) -> ParallelExecutor

retries is the number of times the executor is to retry a failed node. retry_on is a vector of predicates which accept an Exception and return true if a node can and should be retried (and false otherwise). Returns a new ParallelExecutor.

source
dispatch!(exec::ParallelExecutor, node::DispatchNode) -> Future

dispatch! takes the ParallelExecutor and a DispatchNode to run. The run!(::DispatchNode) method on the node is called within an @spawn block and the resulting Future is returned. This is the defining method of ParallelExecutor.

source
Dispatcher.retriesMethod.
retries(exec::Executor) -> Int

Return the number of retries an executor should perform while attempting to run a node before giving up. The default retries method returns 0.

source
retries(exec::Union{AsyncExecutor, ParallelExecutor}) -> Int

Return the number of retries per node.

source
retry_on(exec::Executor) -> Vector{Function}

Return the vector of predicates which accept an Exception and return true if a node can and should be retried (and false otherwise). The default retry_on method returns Function[].

source
retry_on(exec::Union{AsyncExecutor, ParallelExecutor}) -> Vector{Function}

Return the array of retry conditions.

source

Errors

DependencyError

DependencyError wraps any errors (and corresponding traceback) that occur on the dependency of a given nodes.

This is important for passing failure conditions to dependent nodes after a failed number of retries.

NOTE: our trace field is a Union of Vector{Any} and StackTrace because we could be storing the traceback from a CompositeException (inside a RemoteException) which is of type Vector{Any}

source
Base.summaryMethod.
summary(de::DependencyError)

Retuns a string representation of the error with only the internal Exception type and the id

source