API

API

DaskExecutor

DaskExecutor is an Executor which executes julia computations as scheduled by the python dask-scheduler. It can run computations both asynchronously or in parallel (if Workers are started on a julia cluster instead).

DaskExecutor's dispatch!(::DaskExecutor, ::Dispatcher.DispatchNode) method will complete as long as there are no cycles in the computation graph, the dask-scheduler remains online, and there is at least one Worker that is listening to the dask-scheduler.

source
DaskExecutor(scheduler_address::String="127.0.0.1:8786")

Return a new DaskExecutor. The scheduler_address only needs to be included if the dask-scheduler is running on a different machine or not on it's default port (8786).

NOTE: A dask-scheduler must be running at all times or the DaskExecutor execution will fail. If the scheduler is taken offline during execution for some reason, any remaining operations will fail to complete. Start a dask-scheduler from a terminal by typing dask-scheduler:

$ dask-scheduler
Start scheduler at 192.168.0.1:8786

Prerequisites

Note that using the dask-scheduler and executing compuations in a distributed manner can add overhead for simple tasks. Consider using an AsyncExecuter or ParallelExecuter if possible. The advantage that using the dask-scheduler has is that it schedules computations in a manner that is short-term-efficient and long-term-fair.

Usage

The DaskExecutor can run both asynchronously with the Workers, or in parallel if Workers are spawned on separate julia processes in a cluster.

NOTE: Users must startup at least one Worker by pointing it to the dask-scheduler's address or else run! will hang indefinetely.

Examples

  • Running asynchronously:

# Reminder: make sure the dask-scheduler is running
using DaskDistributedDispatcher
using Dispatcher
using ResultTypes

Worker()

exec = DaskExecutor()

a = Op(()->3)
b = Op(()->4)
c = Op(max, a, b)

results = run!(exec, DispatchGraph(c))

fetch(unwrap(results[1]))  # 4
  • Running in parallel:

# Reminder: make sure the dask-scheduler is running
using DaskDistributedDispatcher
using Dispatcher
using ResultTypes

addprocs(3)
@everywhere using DaskDistributedDispatcher

for i in 1:3
    cond = @spawn Worker()
    wait(cond)
end

exec = DaskExecutor()

a = Op(()->3)
b = Op(()->4)
c = Op(max, a, b)

results = run!(exec, DispatchGraph(c))

fetch(unwrap(results[1]))  # 4

To delete all previously computed information from the workers:

reset!(exec)

Advanced Workflows

It is possible to bypass the DaskExecutor and use the Client directly to submit compuations, cancel previously scheduled DispatchNodes, gather results, or replicate data across all workers. See Client for more details. It is recommened to start with a DaskExecutor and access its client field if needed later on.

source
reset!(exec::DaskExecutor)

Restarts the executor's Client, which tells the scheduler to delete previously computed data since it is not needed anymore. The scheduler, in turn, signals this to the workers.

source
run_inner_node!(exec::DaskExecutor, node::DispatchNode, id::Int)

Submit the DispatchNode at position id in the DispatchGraph for scheduling and execution. Any error thrown during the node's execution is caught and wrapped in a DependencyError.

source
retries(exec::DaskExecutor) -> Int

Return the number of retries per node.

source
retry_on(exec::DaskExecutor) -> Vector{Function}

Return the array of retry conditions.

source
dispatch!(exec::DaskExecutor, node::Dispatcher.DispatchNode) -> Future

dispatch! takes the DaskExecutor and a DispatchNode to run and submits the DispatchNode to the Client for scheduling.

This is the defining method of DaskExecutor.

source

Client

Client

Client that can be interacted with to submit computations to the scheduler and gather results. Should only be used directly for advanced workflows. See DaskExecutor instead for normal usage.

Fields

  • keys::Set{String}: previously submitted keys

  • id::String: this client's identifier

  • status::String: status of this client

  • scheduler_address::Address: the dask-distributed scheduler ip address and port info

  • scheduler::Rpc: manager for discrete send/receive open connections to the scheduler

  • scheduler_comm::Nullable{BatchedSend}: batched stream for communication with scheduler

  • pending_msg_buffer::Vector{Dict{String, Any}}: pending msgs to send to the scheduler

source
Client(scheduler_address::String) -> Client

Construct a Client which can then be used to submit computations or gather results from the dask-scheduler process.

Usage

using DaskDistributedDispatcher
using Dispatcher

addprocs(3)
@everywhere using DaskDistributedDispatcher

for i in 1:3
    @spawn Worker("127.0.0.1:8786")
end

client = Client("127.0.0.1:8786")

op = Op(Int, 2.0)
submit(client, op)
result = fetch(op)

Previously submitted Ops can be cancelled by calling:

cancel(client, [op])

# Or if using the `DaskExecutor`
cancel(executor.client, [op])

If needed, which worker(s) to run the computations on can be explicitly specified by returning the worker's address when starting a new worker:

using DaskDistributedDispatcher
client = Client("127.0.0.1:8786")

pnums = addprocs(1)
@everywhere using DaskDistributedDispatcher

worker_address = @fetchfrom pnums[1] begin
    worker = Worker("127.0.0.1:8786")
    return worker.address
end

op = Op(Int, 1.0)
submit(client, op, workers=[worker_address])
result = result(client, op)
source
submit(client::Client, node::DispatchNode; workers::Vector{Address}=Address[])

Submit the node computation unit to the dask-scheduler for computation. Also submits all node's dependencies to the scheduler if they have not previously been submitted.

source
cancel{T<:DispatchNode}(client::Client, nodes::Vector{T})

Cancel all DispatchNodes in nodes. This stops future tasks from being scheduled if they have not yet run and deletes them if they have already run. After calling, this result and all dependent results will no longer be accessible.

source
gather{T<:DispatchNode}(client::Client, nodes::Vector{T}) -> Vector

Gather the results of all nodes. Requires there to be at least one worker available to the scheduler or hangs indefinetely waiting for the results.

source
replicate{T<:DispatchNode}(client::Client; nodes::Vector{T}=DispatchNode[])

Copy data onto many workers. Helps to broadcast frequently accessed data and improve resilience. By default replicates all nodes that have been submitted by this client unless they have been cancelled.

source
shutdown(client::Client)

Tell the dask-scheduler that this client is shutting down. Does NOT terminate the scheduler itself nor the workers. This does not have to be called after a session but is useful to delete all the information submitted by the client from the scheduler and workers (such as between test runs). To reconnect to the scheduler after calling this function set up a new client.

source
get_key{T<:DispatchNode}(node::T)

Calculate an identifying key for node. Keys are re-used for identical nodes to avoid unnecessary computations.

source
ensure_connected(client::Client)

Ensure the client is connected to the dask-scheduler. For internal use.

source
send_to_scheduler(client::Client, msg::Dict{String, Any})

Send msg to the dask-scheduler that the client is connected to. For internal use.

source
serialize_deps{T<:DispatchNode}(args...) -> Tuple

Serialize dependencies to send to the scheduler.

Arguments

  • client::Client

  • deps::Vector{T}: the node dependencies to be serialized

  • keys::Vector{String}: list of all keys that have already been serialized

  • tasks::Dict{String, Dict{String, Vector{UInt8}}}: serialized tasks

  • tasks_deps::Dict{String, Vector{String}}: dependencies for each task

Returns

  • Tuple{Vector{String}, Dict{String, Dict{String, Vector{UInt8}}}, Dict{String, Vector{String}}}: keys, serialized tasks, and task dependencies that will be sent to the scheduler

source
serialize_node(client::Client, node::DispatchNode) -> Tuple

Serialize node into it's task and dependencies. For internal use.

Returns

  • Tuple{Vector{DispatchNode}, Dict{String, Vector{UInt8}}, Vector{String}}: tuple of the task dependencies nodes that are yet to be serialized, the serialized task, and the keys of the serialized task's dependencies that will be sent to the scheduler

source
serialize_task{T<:DispatchNode}(client::Client, node::T, deps::Vector{T}) -> Dict

Serialize node into its components. For internal use.

source

Address

Address

A representation of an endpoint that can be connected to. It is categorized by its scheme (tcp is currently the only protocol supported), host, and port.

source
Address(address::String) -> Address

Parse address and returns the corresponding Address object.

source
Address(host::IPAddr, port::Integer)) -> Address

Return the corresponding Address object to the components host and port. By default the tcp protocol is assumed.

source
Base.showMethod.
show(io::IO, address::Address)

Print a representation of the address to io. The format used to represent addresses is "tcp://127.0.0.1:port".

source
Base.connectMethod.
Base.connect(address::Address)

Open a tcp connection to address.

source
MsgPack.packMethod.
MsgPack.pack(io::Base.AbstractIOBuffer{Vector{UInt8}}, address::Address)

Pack address as its string representation.

source
parse_address(address::String) -> (String, IPAddr, UInt16)

Parse an address into its scheme, host, and port components.

source