Communication
All communication between the julia client and workers with the scheduler is sent using the MsgPack protocol as specified by the dask-scheduler. Workers also use this to commmunicate between themselves and gather dependencies. TCP connections are used for all communication. Julia functions, arguments, and keyword arguments are serialized before being sent. Workers and Clients should all belong to the same julia cluster or will not be able to communicate properly.
API
(For Internal Use)
DaskDistributedDispatcher.send_recv — Method.send_recv{T}(sock::TCPSocket, msg::Dict{String, T})Send a message and wait for the response.
DaskDistributedDispatcher.send_msg — Method.send_msg(sock::TCPSocket, msg::Union{String, Array, Dict})Send msg to sock serialized by MsgPack following the dask.distributed protocol.
DaskDistributedDispatcher.recv_msg — Method.recv_msg(sock::TCPSocket) -> Union{String, Array, Dict}Recieve msg from sock and deserialize it from msgpack encoded bytes to strings.
DaskDistributedDispatcher.close_comm — Method.close_comm(comm::TCPSocket)Tell peer to close and then close the TCPSocket comm
DaskDistributedDispatcher.read_msg — Method.read_msg(msg::Any)Convert msg from bytes to strings except for serialized parts.
DaskDistributedDispatcher.to_serialize — Method.to_serialize(item) -> Vector{UInt8}Serialize item.
DaskDistributedDispatcher.to_deserialize — Method.to_deserialize(serialized_item::Vector{UInt8}) -> AnyParse and deserialize serialized_item.
DaskDistributedDispatcher.pack_data — Method.pack_data(object, data::Dict{String, Any})Merge known data into object.
DaskDistributedDispatcher.pack_object — Method.pack_object(object, data::Dict{String, Any})Replace a DispatchNode's key with its result only if object is a known key.
DaskDistributedDispatcher.unpack_data — Method.unpack_data(object)Unpack DispatchNode objects from object. Returns the unpacked object.
DaskDistributedDispatcher.unpack_object — Method.unpack_object(object)Replace object with its key if object is a DispatchNode or else returns the original object.
Server
(For Internal Use)
DaskDistributedDispatcher.Server — Type.ServerAbstract type to listen for and handle incoming messages.
start_listening(server::Server; handler::Function=handle_comm)Listen for incoming connections on a port and dispatches them to be handled.
DaskDistributedDispatcher.handle_comm — Method.handle_comm(server::Server, comm::TCPSocket)Listen for incoming messages on an established connection.
Rpc
(For Internal Use)
DaskDistributedDispatcher.Rpc — Type.RpcManage open socket connections to a specific address.
DaskDistributedDispatcher.Rpc — Method.Rpc(address::Address) -> RpcManage, open, and reuse socket connections to a specific address as required.
DaskDistributedDispatcher.send_recv — Method.send_recv{T}(rpc::Rpc, msg::Dict{String, T})Send msg and wait for a response.
DaskDistributedDispatcher.start_comm — Method.start_comm(rpc::Rpc) -> TCPSocketStart a new socket connection.
DaskDistributedDispatcher.get_comm — Method.get_comm(rpc::Rpc) -> TCPSocketReuse a previously open connection if available, if not, start a new one.
Base.close — Method.Base.close(rpc::Rpc)Close all communications.
ConnectionPool
(For Internal Use)
ConnectionPoolManage a limited number pool of TCPSocket connections to different addresses. Default number of open connections allowed is 512.
DaskDistributedDispatcher.ConnectionPool — Method.ConnectionPool(limit::Integer=50) -> ConnectionPoolReturn a new ConnectionPool which limits the total possible number of connections open to limit.
DaskDistributedDispatcher.send_recv — Method.send_recv{T}(pool::ConnectionPool, address::Address, msg::Dict{String, T})Send msg to address and wait for a response.
Returns
Union{String, Array, Dict}: the reply received fromaddress
DaskDistributedDispatcher.get_comm — Method.get_comm(pool::ConnectionPool, address::Address)Get a TCPSocket connection to the given address.
DaskDistributedDispatcher.reuse — Method.reuse(pool::ConnectionPool, address::Address, comm::TCPSocket)Reuse an open communication to the given address.
DaskDistributedDispatcher.collect_comms — Method.collect_comms(pool::ConnectionPool)Collect open but unused communications to allow opening other ones.
Base.close — Method.Base.close(pool::ConnectionPool)Close all communications.
BatchedSend
(For Internal Use)
BatchedSendBatch messages in batches on a stream. Batching several messages at once helps performance when sending a myriad of tiny messages. Used by both the julia worker and client to communicate with the scheduler.
DaskDistributedDispatcher.BatchedSend — Method.BatchedSend(comm::TCPSocket; interval::AbstractFloat=0.002) -> BatchedSendBatch messages in batches on comm. We send lists of messages every interval milliseconds.
background_send(batchedsend::BatchedSend)Send the messages in batchsend.buffer every interval milliseconds.
DaskDistributedDispatcher.send_msg — Method.send_msg{T}(batchedsend::BatchedSend, msg::Dict{String, T})Schedule a message for sending to the other side. This completes quickly and synchronously.
Base.close — Method.Base.close(batchedsend::BatchedSend)Try to send all remaining messages and then close the connection.