API
DaskExecutor
DaskExecutor
is an Executor
which executes julia computations as scheduled by the python dask-scheduler
. It can run computations both asynchronously or in parallel (if Worker
s are started on a julia cluster instead).
DaskExecutor
's dispatch!(::DaskExecutor, ::Dispatcher.DispatchNode)
method will complete as long as there are no cycles in the computation graph, the dask-scheduler
remains online, and there is at least one Worker
that is listening to the dask-scheduler
.
DaskDistributedDispatcher.DaskExecutor
— Method.DaskExecutor(scheduler_address::String="127.0.0.1:8786")
Return a new DaskExecutor
. The scheduler_address
only needs to be included if the dask-scheduler
is running on a different machine or not on it's default port (8786).
NOTE: A dask-scheduler
must be running at all times or the DaskExecutor
execution will fail. If the scheduler is taken offline during execution for some reason, any remaining operations will fail to complete. Start a dask-scheduler
from a terminal by typing dask-scheduler
:
$ dask-scheduler
Start scheduler at 192.168.0.1:8786
Prerequisites
python 2.7 or 3.5
the python dask.distributed package (
instructions for install here
)
Note that using the dask-scheduler
and executing compuations in a distributed manner can add overhead for simple tasks
. Consider using an AsyncExecuter
or ParallelExecuter
if possible. The advantage that using the dask-scheduler
has is that it schedules computations in a manner that is short-term-efficient and long-term-fair
.
Usage
The DaskExecutor
can run both asynchronously with the Worker
s, or in parallel if Worker
s are spawned on separate julia processes in a cluster.
NOTE: Users must startup at least one Worker
by pointing it to the dask-scheduler
's address or else run!
will hang indefinetely.
Examples
Running asynchronously:
# Reminder: make sure the dask-scheduler is running
using DaskDistributedDispatcher
using Dispatcher
using ResultTypes
Worker()
exec = DaskExecutor()
a = Op(()->3)
b = Op(()->4)
c = Op(max, a, b)
results = run!(exec, DispatchGraph(c))
fetch(unwrap(results[1])) # 4
Running in parallel:
# Reminder: make sure the dask-scheduler is running
using DaskDistributedDispatcher
using Dispatcher
using ResultTypes
addprocs(3)
@everywhere using DaskDistributedDispatcher
for i in 1:3
cond = @spawn Worker()
wait(cond)
end
exec = DaskExecutor()
a = Op(()->3)
b = Op(()->4)
c = Op(max, a, b)
results = run!(exec, DispatchGraph(c))
fetch(unwrap(results[1])) # 4
To delete all previously computed information from the workers:
reset!(exec)
Advanced Workflows
It is possible to bypass the DaskExecutor
and use the Client
directly to submit compuations, cancel previously scheduled DispatchNode
s, gather results, or replicate data across all workers. See Client
for more details. It is recommened to start with a DaskExecutor
and access its client
field if needed later on.
DaskDistributedDispatcher.reset!
— Method.reset!(exec::DaskExecutor)
Restarts the executor's Client
, which tells the scheduler to delete previously computed data since it is not needed anymore. The scheduler, in turn, signals this to the workers.
Dispatcher.run_inner_node!
— Method.run_inner_node!(exec::DaskExecutor, node::DispatchNode, id::Int)
Submit the DispatchNode
at position id
in the DispatchGraph
for scheduling and execution. Any error thrown during the node's execution is caught and wrapped in a DependencyError
.
DaskDistributedDispatcher.retries
— Method.retries(exec::DaskExecutor) -> Int
Return the number of retries per node.
DaskDistributedDispatcher.retry_on
— Method.retry_on(exec::DaskExecutor) -> Vector{Function}
Return the array of retry conditions.
DaskDistributedDispatcher.dispatch!
— Method.dispatch!(exec::DaskExecutor, node::Dispatcher.DispatchNode) -> Future
dispatch!
takes the DaskExecutor
and a DispatchNode
to run and submits the DispatchNode
to the Client
for scheduling.
This is the defining method of DaskExecutor
.
Client
DaskDistributedDispatcher.Client
— Type.Client
Client that can be interacted with to submit computations to the scheduler and gather results. Should only be used directly for advanced workflows. See DaskExecutor
instead for normal usage.
Fields
keys::Set{String}
: previously submitted keysid::String
: this client's identifierstatus::String
: status of this clientscheduler_address::Address
: the dask-distributed scheduler ip address and port infoscheduler::Rpc
: manager for discrete send/receive open connections to the schedulerscheduler_comm::Nullable{BatchedSend}
: batched stream for communication with schedulerpending_msg_buffer::Vector{Dict{String, Any}}
: pending msgs to send to the scheduler
DaskDistributedDispatcher.Client
— Method.Client(scheduler_address::String) -> Client
Construct a Client
which can then be used to submit computations or gather results from the dask-scheduler process.
Usage
using DaskDistributedDispatcher
using Dispatcher
addprocs(3)
@everywhere using DaskDistributedDispatcher
for i in 1:3
@spawn Worker("127.0.0.1:8786")
end
client = Client("127.0.0.1:8786")
op = Op(Int, 2.0)
submit(client, op)
result = fetch(op)
Previously submitted Ops
can be cancelled by calling:
cancel(client, [op])
# Or if using the `DaskExecutor`
cancel(executor.client, [op])
If needed, which worker(s) to run the computations on can be explicitly specified by returning the worker's address when starting a new worker:
using DaskDistributedDispatcher
client = Client("127.0.0.1:8786")
pnums = addprocs(1)
@everywhere using DaskDistributedDispatcher
worker_address = @fetchfrom pnums[1] begin
worker = Worker("127.0.0.1:8786")
return worker.address
end
op = Op(Int, 1.0)
submit(client, op, workers=[worker_address])
result = result(client, op)
DaskDistributedDispatcher.submit
— Method.submit(client::Client, node::DispatchNode; workers::Vector{Address}=Address[])
Submit the node
computation unit to the dask-scheduler for computation. Also submits all node
's dependencies to the scheduler if they have not previously been submitted.
DaskDistributedDispatcher.cancel
— Method.cancel{T<:DispatchNode}(client::Client, nodes::Vector{T})
Cancel all DispatchNode
s in nodes
. This stops future tasks from being scheduled if they have not yet run and deletes them if they have already run. After calling, this result and all dependent results will no longer be accessible.
DaskDistributedDispatcher.gather
— Method.gather{T<:DispatchNode}(client::Client, nodes::Vector{T}) -> Vector
Gather the results of all nodes
. Requires there to be at least one worker available to the scheduler or hangs indefinetely waiting for the results.
DaskDistributedDispatcher.replicate
— Method.replicate{T<:DispatchNode}(client::Client; nodes::Vector{T}=DispatchNode[])
Copy data onto many workers. Helps to broadcast frequently accessed data and improve resilience. By default replicates all nodes that have been submitted by this client unless they have been cancelled.
DaskDistributedDispatcher.shutdown
— Method.shutdown(client::Client)
Tell the dask-scheduler that this client is shutting down. Does NOT terminate the scheduler itself nor the workers. This does not have to be called after a session but is useful to delete all the information submitted by the client from the scheduler and workers (such as between test runs). To reconnect to the scheduler after calling this function set up a new client.
DaskDistributedDispatcher.get_key
— Method.get_key{T<:DispatchNode}(node::T)
Calculate an identifying key for node
. Keys are re-used for identical nodes
to avoid unnecessary computations.
ensure_connected(client::Client)
Ensure the client
is connected to the dask-scheduler. For internal use.
send_to_scheduler(client::Client, msg::Dict{String, Any})
Send msg
to the dask-scheduler that the client is connected to. For internal use.
DaskDistributedDispatcher.serialize_deps
— Method.serialize_deps{T<:DispatchNode}(args...) -> Tuple
Serialize dependencies to send to the scheduler.
Arguments
client::Client
deps::Vector{T}
: the node dependencies to be serializedkeys::Vector{String}
: list of all keys that have already been serializedtasks::Dict{String, Dict{String, Vector{UInt8}}}
: serialized taskstasks_deps::Dict{String, Vector{String}}
: dependencies for each task
Returns
Tuple{Vector{String}, Dict{String, Dict{String, Vector{UInt8}}}, Dict{String, Vector{String}}}
: keys, serialized tasks, and task dependencies that will be sent to the scheduler
DaskDistributedDispatcher.serialize_node
— Method.serialize_node(client::Client, node::DispatchNode) -> Tuple
Serialize node
into it's task and dependencies. For internal use.
Returns
Tuple{Vector{DispatchNode}, Dict{String, Vector{UInt8}}, Vector{String}}
: tuple of the task dependencies nodes that are yet to be serialized, the serialized task, and the keys of the serialized task's dependencies that will be sent to the scheduler
DaskDistributedDispatcher.serialize_task
— Method.serialize_task{T<:DispatchNode}(client::Client, node::T, deps::Vector{T}) -> Dict
Serialize node
into its components. For internal use.
Address
Address
A representation of an endpoint that can be connected to. It is categorized by its scheme (tcp is currently the only protocol supported), host, and port.
DaskDistributedDispatcher.Address
— Method.Address(address::String) -> Address
Parse address
and returns the corresponding Address
object.
DaskDistributedDispatcher.Address
— Method.Address(host::IPAddr, port::Integer)) -> Address
Return the corresponding Address
object to the components host
and port
. By default the tcp protocol is assumed.
Base.show
— Method.show(io::IO, address::Address)
Print a representation of the address to io
. The format used to represent addresses is "tcp://127.0.0.1:port".
Base.connect
— Method.Base.connect(address::Address)
Open a tcp connection to address
.
MsgPack.pack
— Method.MsgPack.pack(io::Base.AbstractIOBuffer{Vector{UInt8}}, address::Address)
Pack address
as its string representation.
DaskDistributedDispatcher.parse_address
— Method.parse_address(address::String) -> (String, IPAddr, UInt16)
Parse an address into its scheme, host, and port components.