API
DaskExecutor
DaskExecutor is an Executor which executes julia computations as scheduled by the python dask-scheduler. It can run computations both asynchronously or in parallel (if Workers are started on a julia cluster instead).
DaskExecutor's dispatch!(::DaskExecutor, ::Dispatcher.DispatchNode) method will complete as long as there are no cycles in the computation graph, the dask-scheduler remains online, and there is at least one Worker that is listening to the dask-scheduler.
DaskDistributedDispatcher.DaskExecutor — Method.DaskExecutor(scheduler_address::String="127.0.0.1:8786")Return a new DaskExecutor. The scheduler_address only needs to be included if the dask-scheduler is running on a different machine or not on it's default port (8786).
NOTE: A dask-scheduler must be running at all times or the DaskExecutor execution will fail. If the scheduler is taken offline during execution for some reason, any remaining operations will fail to complete. Start a dask-scheduler from a terminal by typing dask-scheduler:
$ dask-scheduler
Start scheduler at 192.168.0.1:8786Prerequisites
python 2.7 or 3.5
the python dask.distributed package (
instructions for install here)
Note that using the dask-scheduler and executing compuations in a distributed manner can add overhead for simple tasks. Consider using an AsyncExecuter or ParallelExecuter if possible. The advantage that using the dask-scheduler has is that it schedules computations in a manner that is short-term-efficient and long-term-fair.
Usage
The DaskExecutor can run both asynchronously with the Workers, or in parallel if Workers are spawned on separate julia processes in a cluster.
NOTE: Users must startup at least one Worker by pointing it to the dask-scheduler's address or else run! will hang indefinetely.
Examples
Running asynchronously:
# Reminder: make sure the dask-scheduler is running
using DaskDistributedDispatcher
using Dispatcher
using ResultTypes
Worker()
exec = DaskExecutor()
a = Op(()->3)
b = Op(()->4)
c = Op(max, a, b)
results = run!(exec, DispatchGraph(c))
fetch(unwrap(results[1])) # 4Running in parallel:
# Reminder: make sure the dask-scheduler is running
using DaskDistributedDispatcher
using Dispatcher
using ResultTypes
addprocs(3)
@everywhere using DaskDistributedDispatcher
for i in 1:3
cond = @spawn Worker()
wait(cond)
end
exec = DaskExecutor()
a = Op(()->3)
b = Op(()->4)
c = Op(max, a, b)
results = run!(exec, DispatchGraph(c))
fetch(unwrap(results[1])) # 4To delete all previously computed information from the workers:
reset!(exec)Advanced Workflows
It is possible to bypass the DaskExecutor and use the Client directly to submit compuations, cancel previously scheduled DispatchNodes, gather results, or replicate data across all workers. See Client for more details. It is recommened to start with a DaskExecutor and access its client field if needed later on.
DaskDistributedDispatcher.reset! — Method.reset!(exec::DaskExecutor)Restarts the executor's Client, which tells the scheduler to delete previously computed data since it is not needed anymore. The scheduler, in turn, signals this to the workers.
Dispatcher.run_inner_node! — Method.run_inner_node!(exec::DaskExecutor, node::DispatchNode, id::Int)Submit the DispatchNode at position id in the DispatchGraph for scheduling and execution. Any error thrown during the node's execution is caught and wrapped in a DependencyError.
DaskDistributedDispatcher.retries — Method.retries(exec::DaskExecutor) -> IntReturn the number of retries per node.
DaskDistributedDispatcher.retry_on — Method.retry_on(exec::DaskExecutor) -> Vector{Function}Return the array of retry conditions.
DaskDistributedDispatcher.dispatch! — Method.dispatch!(exec::DaskExecutor, node::Dispatcher.DispatchNode) -> Futuredispatch! takes the DaskExecutor and a DispatchNode to run and submits the DispatchNode to the Client for scheduling.
This is the defining method of DaskExecutor.
Client
DaskDistributedDispatcher.Client — Type.ClientClient that can be interacted with to submit computations to the scheduler and gather results. Should only be used directly for advanced workflows. See DaskExecutor instead for normal usage.
Fields
keys::Set{String}: previously submitted keysid::String: this client's identifierstatus::String: status of this clientscheduler_address::Address: the dask-distributed scheduler ip address and port infoscheduler::Rpc: manager for discrete send/receive open connections to the schedulerscheduler_comm::Nullable{BatchedSend}: batched stream for communication with schedulerpending_msg_buffer::Vector{Dict{String, Any}}: pending msgs to send to the scheduler
DaskDistributedDispatcher.Client — Method.Client(scheduler_address::String) -> ClientConstruct a Client which can then be used to submit computations or gather results from the dask-scheduler process.
Usage
using DaskDistributedDispatcher
using Dispatcher
addprocs(3)
@everywhere using DaskDistributedDispatcher
for i in 1:3
@spawn Worker("127.0.0.1:8786")
end
client = Client("127.0.0.1:8786")
op = Op(Int, 2.0)
submit(client, op)
result = fetch(op)Previously submitted Ops can be cancelled by calling:
cancel(client, [op])
# Or if using the `DaskExecutor`
cancel(executor.client, [op])If needed, which worker(s) to run the computations on can be explicitly specified by returning the worker's address when starting a new worker:
using DaskDistributedDispatcher
client = Client("127.0.0.1:8786")
pnums = addprocs(1)
@everywhere using DaskDistributedDispatcher
worker_address = @fetchfrom pnums[1] begin
worker = Worker("127.0.0.1:8786")
return worker.address
end
op = Op(Int, 1.0)
submit(client, op, workers=[worker_address])
result = result(client, op)DaskDistributedDispatcher.submit — Method.submit(client::Client, node::DispatchNode; workers::Vector{Address}=Address[])Submit the node computation unit to the dask-scheduler for computation. Also submits all node's dependencies to the scheduler if they have not previously been submitted.
DaskDistributedDispatcher.cancel — Method.cancel{T<:DispatchNode}(client::Client, nodes::Vector{T})Cancel all DispatchNodes in nodes. This stops future tasks from being scheduled if they have not yet run and deletes them if they have already run. After calling, this result and all dependent results will no longer be accessible.
DaskDistributedDispatcher.gather — Method.gather{T<:DispatchNode}(client::Client, nodes::Vector{T}) -> VectorGather the results of all nodes. Requires there to be at least one worker available to the scheduler or hangs indefinetely waiting for the results.
DaskDistributedDispatcher.replicate — Method.replicate{T<:DispatchNode}(client::Client; nodes::Vector{T}=DispatchNode[])Copy data onto many workers. Helps to broadcast frequently accessed data and improve resilience. By default replicates all nodes that have been submitted by this client unless they have been cancelled.
DaskDistributedDispatcher.shutdown — Method.shutdown(client::Client)Tell the dask-scheduler that this client is shutting down. Does NOT terminate the scheduler itself nor the workers. This does not have to be called after a session but is useful to delete all the information submitted by the client from the scheduler and workers (such as between test runs). To reconnect to the scheduler after calling this function set up a new client.
DaskDistributedDispatcher.get_key — Method.get_key{T<:DispatchNode}(node::T)Calculate an identifying key for node. Keys are re-used for identical nodes to avoid unnecessary computations.
ensure_connected(client::Client)Ensure the client is connected to the dask-scheduler. For internal use.
send_to_scheduler(client::Client, msg::Dict{String, Any})Send msg to the dask-scheduler that the client is connected to. For internal use.
DaskDistributedDispatcher.serialize_deps — Method.serialize_deps{T<:DispatchNode}(args...) -> TupleSerialize dependencies to send to the scheduler.
Arguments
client::Clientdeps::Vector{T}: the node dependencies to be serializedkeys::Vector{String}: list of all keys that have already been serializedtasks::Dict{String, Dict{String, Vector{UInt8}}}: serialized taskstasks_deps::Dict{String, Vector{String}}: dependencies for each task
Returns
Tuple{Vector{String}, Dict{String, Dict{String, Vector{UInt8}}}, Dict{String, Vector{String}}}: keys, serialized tasks, and task dependencies that will be sent to the scheduler
DaskDistributedDispatcher.serialize_node — Method.serialize_node(client::Client, node::DispatchNode) -> TupleSerialize node into it's task and dependencies. For internal use.
Returns
Tuple{Vector{DispatchNode}, Dict{String, Vector{UInt8}}, Vector{String}}: tuple of the task dependencies nodes that are yet to be serialized, the serialized task, and the keys of the serialized task's dependencies that will be sent to the scheduler
DaskDistributedDispatcher.serialize_task — Method.serialize_task{T<:DispatchNode}(client::Client, node::T, deps::Vector{T}) -> DictSerialize node into its components. For internal use.
Address
AddressA representation of an endpoint that can be connected to. It is categorized by its scheme (tcp is currently the only protocol supported), host, and port.
DaskDistributedDispatcher.Address — Method.Address(address::String) -> AddressParse address and returns the corresponding Address object.
DaskDistributedDispatcher.Address — Method.Address(host::IPAddr, port::Integer)) -> AddressReturn the corresponding Address object to the components host and port. By default the tcp protocol is assumed.
Base.show — Method.show(io::IO, address::Address)Print a representation of the address to io. The format used to represent addresses is "tcp://127.0.0.1:port".
Base.connect — Method.Base.connect(address::Address)Open a tcp connection to address.
MsgPack.pack — Method.MsgPack.pack(io::Base.AbstractIOBuffer{Vector{UInt8}}, address::Address)Pack address as its string representation.
DaskDistributedDispatcher.parse_address — Method.parse_address(address::String) -> (String, IPAddr, UInt16)Parse an address into its scheme, host, and port components.