Workers

Workers

Julia workers were developed that integrate with the python dask-scheduler, and hence follow many of the same patterns that the python dask-workers do.

Notable Differences

Tasks

Usually computations submitted to a worker go through task states in the following order:

waiting -> ready -> executing -> memory

Computations that result in errors being thrown are caught and the error is saved in memory. Workers communicate between themselves to gather dependencies and with the dask-scheduler.

API

Worker

A Worker represents a worker endpoint in the distributed cluster. It accepts instructions from the scheduler, fetches dependencies, executes compuations, stores data, and communicates state to the scheduler.

Fields

  • status::Symbol: status of this worker

  • address::Address:: ip address and port that this worker is listening on

  • listener::Base.TCPServer: tcp server that listens for incoming connections

  • scheduler_address::Address: the dask-distributed scheduler ip address and port info

  • batched_stream::Nullable{BatchedSend}: batched stream for communication with scheduler

  • scheduler::Rpc: manager for discrete send/receive open connections to the scheduler

  • connection_pool::ConnectionPool: manages connections to peers

  • handlers::Dict{String, Function}: handlers for operations requested by open connections

  • compute_stream_handlers::Dict{String, Function}: handlers for compute stream operations

  • transitions::Dict{Tuple, Function}: valid transitions that a task can make

  • data_needed::Deque{String}: keys whose data we still lack

  • ready::PriorityQueue{String, Tuple, Base.Order.ForwardOrdering}: keys ready to run

  • data::Dict{String, Any}: maps keys to the results of function calls (actual values)

  • tasks::Dict{String, Tuple}: maps keys to the function, args, and kwargs of a task

  • task_state::Dict{String, Symbol}: maps keys tot heir state: (waiting, executing, memory)

  • priorities::Dict{String, Tuple}: run time order priority of a key given by the scheduler

  • priority_counter::Int: used to prioritize tasks by their order of arrival

  • dep_transitions::Dict{Tuple, Function}: valid transitions that a dependency can make

  • dep_state::Dict{String, Symbol}: maps dependencies with their state (waiting, flight, memory)

  • dependencies::Dict{String, Set}: maps a key to the data it needs to run

  • dependents::Dict{String, Set}: maps a dependency to the keys that use it

  • waiting_for_data::Dict{String, Set}: maps a key to the data it needs that we don't have

  • pending_data_per_worker::DefaultDict{String, Deque}: data per worker that we want

  • who_has::Dict{String, Set}: maps keys to the workers believed to have their data

  • has_what::DefaultDict{String, Set{String}}: maps workers to the data they have

  • in_flight_workers::Dict{String, Set}: workers from which we are getting data from

  • missing_dep_flight::Set{String}: missing dependencies

source
Worker(scheduler_address::String="127.0.0.1:8786")

Create a Worker that listens on a random port between 1024 and 9000 for incoming messages. By default if the scheduler's address is not provided it assumes that the dask-scheduler is being run on the same machine and on the default port 8786.

NOTE: Worker's must be started in the same julia cluster as the DaskExecutor (and it's Client).

Usage

Worker()  # The dask-scheduler is being run on the same machine on its default port 8786.

or also

Worker("$(getipaddr()):8786") # Scheduler is running on the same machine

If running the dask-scheduler on a different machine or port:

  • First start the dask-scheduler and inspect its startup logs:

$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO -   Scheduler at:   tcp://127.0.0.1:8786
distributed.scheduler - INFO - etc.
distributed.scheduler - INFO - -----------------------------------------------
  • Then start workers with it's printed address:

Worker("tcp://127.0.0.1:8786")

No further actions are needed directly on the Worker's themselves as they will communicate with the dask-scheduler independently. New Workers can be added/removed at any time during execution. There usually should be at least one Worker to run computations.

Cleanup

To explicitly shutdown a worker and delete it's information use:

worker = Worker()
shutdown([worker.address])

It is more effective to explicitly reset the DaskExecutor or shutdown a Client rather than a Worker because the dask-scheduler will automatically re-schedule the lost computations on other Workers if it thinks that a Client still needs the lost data.

Worker's are lost if they were spawned on a julia process that exits or is removed via rmprocs from the julia cluster. It is cleaner but not necessary to explicity call shutdown if planning to remove a Worker.

source
shutdown(workers::Vector{Address})

Connect to and terminate all workers in workers.

source
Base.showMethod.
show(io::IO, worker::Worker)

Print a representation of the worker and it's state.

source

Internals

start(worker::Worker)

Coordinate a worker's startup.

source
register(worker::Worker)

Register a Worker with the dask-scheduler process.

source
handle_comm(worker::Worker, comm::TCPSocket)

Listen for incoming messages on an established connection.

source
Base.closeMethod.
Base.close(worker::Worker; report::String="true")

Close the worker and all the connections it has open.

source
get_data(worker::Worker; keys::Array=String[], who::String="") -> Dict

Send the results of keys back over the stream they were requested on.

Returns

  • Dict{String, Vector{UInt8}}: dictionary mapping keys to their serialized data for communication

source
gather(worker::Worker; who_has::Dict=Dict{String, Vector{String}}())

Gather the results for various keys.

source
update_data(worker::Worker; data::Dict=Dict(), report::String="true") -> Dict

Update the worker data.

source
delete_data(worker::Worker; keys::Array=String[], report::String="true")

Delete the data associated with each key of keys in worker.data.

source
terminate(worker::Worker; report::String="true")

Shutdown the worker and close all its connections.

source
get_keys(worker::Worker) -> Vector{String}

Get a list of all the keys held by this worker for communication with scheduler and other workers.

source
add_task(worker::Worker; kwargs...)

Add a task to the worker's list of tasks to be computed.

Keywords

  • key::String: The tasks's unique identifier. Throws an exception if blank.

  • priority::Array: The priority of the task. Throws an exception if blank.

  • who_has::Dict: Map of dependent keys and the addresses of the workers that have them.

  • func::Union{String, Vector{UInt8}}: The callable funtion for the task, serialized.

  • args::Union{String, Vector{UInt8}}}: The arguments for the task, serialized.

  • kwargs::Union{String, Vector{UInt8}}: The keyword arguments for the task, serialized.

  • future::Union{String, Vector{UInt8}}}: The tasks's serialized DeferredFuture.

source
release_key(worker::Worker; key::String="", cause::String="", reason::String="")

Delete a key and its data.

source
release_dep(worker::Worker, dep::String)

Delete a dependency key and its data.

source
ensure_computing(worker::Worker)

Make sure the worker is computing available tasks.

source
execute(worker::Worker, key::String)

Execute the task identified by key.

source
put_key_in_memory(worker::Worker, key::String, value; should_transition::Bool=true)

Store the result (value) of the task identified by key.

source
ensure_communicating(worker::Worker)

Ensure the worker is communicating with its peers to gather dependencies as needed.

source
gather_dep(worker::Worker, worker_addr::String, dep::String, deps::Set; cause::String="")

Gather the dependency with identifier "dep" from worker_addr.

source
handle_missing_dep(worker::Worker, deps::Set{String})

Handle a missing dependency that can't be found on any peers.

source
update_who_has(worker::Worker, who_has::Dict{String, Vector{String}})

Ensure who_has is up to date and accurate.

source
select_keys_for_gather(worker::Worker, worker_addr::String, dep::String)

Select which keys to gather from peer at worker_addr.

source
gather_from_workers(who_has::Dict, connection_pool::ConnectionPool) -> Tuple

Gather data directly from who_has peers.

source
transition(worker::Worker, key::String, finish_state::Symbol; kwargs...)

Transition task with identifier key to finish_state from its current state.

source
transition_dep(worker::Worker, dep::String, finish_state::Symbol; kwargs...)

Transition dependency task with identifier key to finish_state from its current state.

source
send_task_state_to_scheduler(worker::Worker, key::String)

Send the state of task key to the scheduler.

source
deserialize_task(func, args, kwargs, future) -> Tuple

Deserialize task inputs and regularize to func, args, kwargs.

Returns

  • Tuple: The deserialized function, arguments, keyword arguments, and Deferredfuture for

the task.

source
apply_function(key::String, func::Base.Callable, args::Any, kwargs::Any)

Run a function and return collected information.

source