Communication
All communication between the julia client and workers with the scheduler is sent using the MsgPack protocol as specified by the dask-scheduler
. Workers also use this to commmunicate between themselves and gather dependencies. TCP connections are used for all communication. Julia functions, arguments, and keyword arguments are serialized before being sent. Worker
s and Client
s should all belong to the same julia cluster or will not be able to communicate properly.
API
(For Internal Use)
DaskDistributedDispatcher.send_recv
— Method.send_recv{T}(sock::TCPSocket, msg::Dict{String, T})
Send a message and wait for the response.
DaskDistributedDispatcher.send_msg
— Method.send_msg(sock::TCPSocket, msg::Union{String, Array, Dict})
Send msg
to sock
serialized by MsgPack following the dask.distributed protocol.
DaskDistributedDispatcher.recv_msg
— Method.recv_msg(sock::TCPSocket) -> Union{String, Array, Dict}
Recieve msg
from sock
and deserialize it from msgpack encoded bytes to strings.
DaskDistributedDispatcher.close_comm
— Method.close_comm(comm::TCPSocket)
Tell peer to close and then close the TCPSocket comm
DaskDistributedDispatcher.read_msg
— Method.read_msg(msg::Any)
Convert msg
from bytes to strings except for serialized parts.
DaskDistributedDispatcher.to_serialize
— Method.to_serialize(item) -> Vector{UInt8}
Serialize item
.
DaskDistributedDispatcher.to_deserialize
— Method.to_deserialize(serialized_item::Vector{UInt8}) -> Any
Parse and deserialize serialized_item
.
DaskDistributedDispatcher.pack_data
— Method.pack_data(object, data::Dict{String, Any})
Merge known data
into object
.
DaskDistributedDispatcher.pack_object
— Method.pack_object(object, data::Dict{String, Any})
Replace a DispatchNode's key with its result only if object
is a known key.
DaskDistributedDispatcher.unpack_data
— Method.unpack_data(object)
Unpack DispatchNode
objects from object
. Returns the unpacked object.
DaskDistributedDispatcher.unpack_object
— Method.unpack_object(object)
Replace object
with its key if object
is a DispatchNode or else returns the original object
.
Server
(For Internal Use)
DaskDistributedDispatcher.Server
— Type.Server
Abstract type to listen for and handle incoming messages.
start_listening(server::Server; handler::Function=handle_comm)
Listen for incoming connections on a port and dispatches them to be handled.
DaskDistributedDispatcher.handle_comm
— Method.handle_comm(server::Server, comm::TCPSocket)
Listen for incoming messages on an established connection.
Rpc
(For Internal Use)
DaskDistributedDispatcher.Rpc
— Type.Rpc
Manage open socket connections to a specific address.
DaskDistributedDispatcher.Rpc
— Method.Rpc(address::Address) -> Rpc
Manage, open, and reuse socket connections to a specific address as required.
DaskDistributedDispatcher.send_recv
— Method.send_recv{T}(rpc::Rpc, msg::Dict{String, T})
Send msg
and wait for a response.
DaskDistributedDispatcher.start_comm
— Method.start_comm(rpc::Rpc) -> TCPSocket
Start a new socket connection.
DaskDistributedDispatcher.get_comm
— Method.get_comm(rpc::Rpc) -> TCPSocket
Reuse a previously open connection if available, if not, start a new one.
Base.close
— Method.Base.close(rpc::Rpc)
Close all communications.
ConnectionPool
(For Internal Use)
ConnectionPool
Manage a limited number pool of TCPSocket connections to different addresses. Default number of open connections allowed is 512.
DaskDistributedDispatcher.ConnectionPool
— Method.ConnectionPool(limit::Integer=50) -> ConnectionPool
Return a new ConnectionPool
which limits the total possible number of connections open to limit
.
DaskDistributedDispatcher.send_recv
— Method.send_recv{T}(pool::ConnectionPool, address::Address, msg::Dict{String, T})
Send msg
to address
and wait for a response.
Returns
Union{String, Array, Dict}
: the reply received fromaddress
DaskDistributedDispatcher.get_comm
— Method.get_comm(pool::ConnectionPool, address::Address)
Get a TCPSocket connection to the given address.
DaskDistributedDispatcher.reuse
— Method.reuse(pool::ConnectionPool, address::Address, comm::TCPSocket)
Reuse an open communication to the given address.
DaskDistributedDispatcher.collect_comms
— Method.collect_comms(pool::ConnectionPool)
Collect open but unused communications to allow opening other ones.
Base.close
— Method.Base.close(pool::ConnectionPool)
Close all communications.
BatchedSend
(For Internal Use)
BatchedSend
Batch messages in batches on a stream. Batching several messages at once helps performance when sending a myriad of tiny messages. Used by both the julia worker and client to communicate with the scheduler.
DaskDistributedDispatcher.BatchedSend
— Method.BatchedSend(comm::TCPSocket; interval::AbstractFloat=0.002) -> BatchedSend
Batch messages in batches on comm
. We send lists of messages every interval
milliseconds.
background_send(batchedsend::BatchedSend)
Send the messages in batchsend.buffer
every interval
milliseconds.
DaskDistributedDispatcher.send_msg
— Method.send_msg{T}(batchedsend::BatchedSend, msg::Dict{String, T})
Schedule a message for sending to the other side. This completes quickly and synchronously.
Base.close
— Method.Base.close(batchedsend::BatchedSend)
Try to send all remaining messages and then close the connection.