Workers
Julia workers were developed that integrate with the python dask-scheduler
, and hence follow many of the same patterns that the python dask-workers
do.
Notable Differences
The julia workers don't execute computations in a thread pool but rather do so asynchronously. The recommended way to setup the workers is to use
addprocs
and spawn at least oneWorker
per julia process added in the cluster.Currently the julia workers do not support specifying
resources
needed by computations or spilling excess data onto disk.
Tasks
Usually computations submitted to a worker go through task states in the following order:
waiting -> ready -> executing -> memory
Computations that result in errors being thrown are caught and the error is saved in memory. Workers communicate between themselves to gather dependencies and with the dask-scheduler.
API
DaskDistributedDispatcher.Worker
— Type.Worker
A Worker
represents a worker endpoint in the distributed cluster. It accepts instructions from the scheduler, fetches dependencies, executes compuations, stores data, and communicates state to the scheduler.
Fields
status::Symbol
: status of this workeraddress::Address
:: ip address and port that this worker is listening onlistener::Base.TCPServer
: tcp server that listens for incoming connectionsscheduler_address::Address
: the dask-distributed scheduler ip address and port infobatched_stream::Nullable{BatchedSend}
: batched stream for communication with schedulerscheduler::Rpc
: manager for discrete send/receive open connections to the schedulerconnection_pool::ConnectionPool
: manages connections to peershandlers::Dict{String, Function}
: handlers for operations requested by open connectionscompute_stream_handlers::Dict{String, Function}
: handlers for compute stream operationstransitions::Dict{Tuple, Function}
: valid transitions that a task can makedata_needed::Deque{String}
: keys whose data we still lackready::PriorityQueue{String, Tuple, Base.Order.ForwardOrdering}
: keys ready to rundata::Dict{String, Any}
: maps keys to the results of function calls (actual values)tasks::Dict{String, Tuple}
: maps keys to the function, args, and kwargs of a tasktask_state::Dict{String, Symbol}
: maps keys tot heir state: (waiting, executing, memory)priorities::Dict{String, Tuple}
: run time order priority of a key given by the schedulerpriority_counter::Int
: used to prioritize tasks by their order of arrivaldep_transitions::Dict{Tuple, Function}
: valid transitions that a dependency can makedep_state::Dict{String, Symbol}
: maps dependencies with their state (waiting, flight, memory)dependencies::Dict{String, Set}
: maps a key to the data it needs to rundependents::Dict{String, Set}
: maps a dependency to the keys that use itwaiting_for_data::Dict{String, Set}
: maps a key to the data it needs that we don't havepending_data_per_worker::DefaultDict{String, Deque}
: data per worker that we wantwho_has::Dict{String, Set}
: maps keys to the workers believed to have their datahas_what::DefaultDict{String, Set{String}}
: maps workers to the data they havein_flight_workers::Dict{String, Set}
: workers from which we are getting data frommissing_dep_flight::Set{String}
: missing dependencies
DaskDistributedDispatcher.Worker
— Method.Worker(scheduler_address::String="127.0.0.1:8786")
Create a Worker
that listens on a random port between 1024 and 9000 for incoming messages. By default if the scheduler's address is not provided it assumes that the dask-scheduler is being run on the same machine and on the default port 8786.
NOTE: Worker's must be started in the same julia cluster as the DaskExecutor
(and it's Client
).
Usage
Worker() # The dask-scheduler is being run on the same machine on its default port 8786.
or also
Worker("$(getipaddr()):8786") # Scheduler is running on the same machine
If running the dask-scheduler on a different machine or port:
First start the
dask-scheduler
and inspect its startup logs:
$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786
distributed.scheduler - INFO - etc.
distributed.scheduler - INFO - -----------------------------------------------
Then start workers with it's printed address:
Worker("tcp://127.0.0.1:8786")
No further actions are needed directly on the Worker's themselves as they will communicate with the dask-scheduler
independently. New Worker
s can be added/removed at any time during execution. There usually should be at least one Worker
to run computations.
Cleanup
To explicitly shutdown a worker and delete it's information use:
worker = Worker()
shutdown([worker.address])
It is more effective to explicitly reset the DaskExecutor
or shutdown a Client
rather than a Worker
because the dask-scheduler will automatically re-schedule the lost computations on other Workers
if it thinks that a Client
still needs the lost data.
Worker
's are lost if they were spawned on a julia process that exits or is removed via rmprocs
from the julia cluster. It is cleaner but not necessary to explicity call shutdown
if planning to remove a Worker
.
DaskDistributedDispatcher.shutdown
— Method.shutdown(workers::Vector{Address})
Connect to and terminate all workers in workers
.
Base.show
— Method.show(io::IO, worker::Worker)
Print a representation of the worker and it's state.
Internals
DaskDistributedDispatcher.start
— Method.start(worker::Worker)
Coordinate a worker's startup.
DaskDistributedDispatcher.register
— Method.register(worker::Worker)
Register a Worker
with the dask-scheduler process.
DaskDistributedDispatcher.handle_comm
— Method.handle_comm(worker::Worker, comm::TCPSocket)
Listen for incoming messages on an established connection.
Base.close
— Method.Base.close(worker::Worker; report::String="true")
Close the worker and all the connections it has open.
DaskDistributedDispatcher.get_data
— Method.get_data(worker::Worker; keys::Array=String[], who::String="") -> Dict
Send the results of keys
back over the stream they were requested on.
Returns
Dict{String, Vector{UInt8}}
: dictionary mapping keys to their serialized data for communication
DaskDistributedDispatcher.gather
— Method.gather(worker::Worker; who_has::Dict=Dict{String, Vector{String}}())
Gather the results for various keys.
DaskDistributedDispatcher.update_data
— Method.update_data(worker::Worker; data::Dict=Dict(), report::String="true") -> Dict
Update the worker data.
DaskDistributedDispatcher.delete_data
— Method.delete_data(worker::Worker; keys::Array=String[], report::String="true")
Delete the data associated with each key of keys
in worker.data
.
DaskDistributedDispatcher.terminate
— Method.terminate(worker::Worker; report::String="true")
Shutdown the worker and close all its connections.
DaskDistributedDispatcher.get_keys
— Method.get_keys(worker::Worker) -> Vector{String}
Get a list of all the keys held by this worker for communication with scheduler and other workers.
DaskDistributedDispatcher.add_task
— Method.add_task(worker::Worker; kwargs...)
Add a task to the worker's list of tasks to be computed.
Keywords
key::String
: The tasks's unique identifier. Throws an exception if blank.priority::Array
: The priority of the task. Throws an exception if blank.who_has::Dict
: Map of dependent keys and the addresses of the workers that have them.func::Union{String, Vector{UInt8}}
: The callable funtion for the task, serialized.args::Union{String, Vector{UInt8}}}
: The arguments for the task, serialized.kwargs::Union{String, Vector{UInt8}}
: The keyword arguments for the task, serialized.future::Union{String, Vector{UInt8}}}
: The tasks's serializedDeferredFuture
.
DaskDistributedDispatcher.release_key
— Method.release_key(worker::Worker; key::String="", cause::String="", reason::String="")
Delete a key and its data.
DaskDistributedDispatcher.release_dep
— Method.release_dep(worker::Worker, dep::String)
Delete a dependency key and its data.
ensure_computing(worker::Worker)
Make sure the worker is computing available tasks.
DaskDistributedDispatcher.execute
— Method.execute(worker::Worker, key::String)
Execute the task identified by key
.
put_key_in_memory(worker::Worker, key::String, value; should_transition::Bool=true)
Store the result (value
) of the task identified by key
.
ensure_communicating(worker::Worker)
Ensure the worker is communicating with its peers to gather dependencies as needed.
DaskDistributedDispatcher.gather_dep
— Method.gather_dep(worker::Worker, worker_addr::String, dep::String, deps::Set; cause::String="")
Gather the dependency with identifier "dep" from worker_addr
.
handle_missing_dep(worker::Worker, deps::Set{String})
Handle a missing dependency that can't be found on any peers.
DaskDistributedDispatcher.update_who_has
— Method.update_who_has(worker::Worker, who_has::Dict{String, Vector{String}})
Ensure who_has
is up to date and accurate.
select_keys_for_gather(worker::Worker, worker_addr::String, dep::String)
Select which keys to gather from peer at worker_addr
.
gather_from_workers(who_has::Dict, connection_pool::ConnectionPool) -> Tuple
Gather data directly from who_has
peers.
DaskDistributedDispatcher.transition
— Method.transition(worker::Worker, key::String, finish_state::Symbol; kwargs...)
Transition task with identifier key
to finish_state from its current state.
DaskDistributedDispatcher.transition_dep
— Method.transition_dep(worker::Worker, dep::String, finish_state::Symbol; kwargs...)
Transition dependency task with identifier key
to finish_state from its current state.
send_task_state_to_scheduler(worker::Worker, key::String)
Send the state of task key
to the scheduler.
deserialize_task(func, args, kwargs, future) -> Tuple
Deserialize task inputs and regularize to func, args, kwargs.
Returns
Tuple
: The deserialized function, arguments, keyword arguments, and Deferredfuture for
the task.
DaskDistributedDispatcher.apply_function
— Method.apply_function(key::String, func::Base.Callable, args::Any, kwargs::Any)
Run a function and return collected information.