Communication

Communication

All communication between the julia client and workers with the scheduler is sent using the MsgPack protocol as specified by the dask-scheduler. Workers also use this to commmunicate between themselves and gather dependencies. TCP connections are used for all communication. Julia functions, arguments, and keyword arguments are serialized before being sent. Workers and Clients should all belong to the same julia cluster or will not be able to communicate properly.

API

(For Internal Use)

send_recv{T}(sock::TCPSocket, msg::Dict{String, T})

Send a message and wait for the response.

source
send_msg(sock::TCPSocket, msg::Union{String, Array, Dict})

Send msg to sock serialized by MsgPack following the dask.distributed protocol.

source
recv_msg(sock::TCPSocket) -> Union{String, Array, Dict}

Recieve msg from sock and deserialize it from msgpack encoded bytes to strings.

source
close_comm(comm::TCPSocket)

Tell peer to close and then close the TCPSocket comm

source
read_msg(msg::Any)

Convert msg from bytes to strings except for serialized parts.

source
to_serialize(item) -> Vector{UInt8}

Serialize item.

source
to_deserialize(serialized_item::Vector{UInt8}) -> Any

Parse and deserialize serialized_item.

source
pack_data(object, data::Dict{String, Any})

Merge known data into object.

source
pack_object(object, data::Dict{String, Any})

Replace a DispatchNode's key with its result only if object is a known key.

source
unpack_data(object)

Unpack DispatchNode objects from object. Returns the unpacked object.

source
unpack_object(object)

Replace object with its key if object is a DispatchNode or else returns the original object.

source

Server

(For Internal Use)

Server

Abstract type to listen for and handle incoming messages.

source
start_listening(server::Server; handler::Function=handle_comm)

Listen for incoming connections on a port and dispatches them to be handled.

source
handle_comm(server::Server, comm::TCPSocket)

Listen for incoming messages on an established connection.

source

Rpc

(For Internal Use)

Rpc

Manage open socket connections to a specific address.

source
Rpc(address::Address) -> Rpc

Manage, open, and reuse socket connections to a specific address as required.

source
send_recv{T}(rpc::Rpc, msg::Dict{String, T})

Send msg and wait for a response.

source
start_comm(rpc::Rpc) -> TCPSocket

Start a new socket connection.

source
get_comm(rpc::Rpc) -> TCPSocket

Reuse a previously open connection if available, if not, start a new one.

source
Base.closeMethod.
Base.close(rpc::Rpc)

Close all communications.

source

ConnectionPool

(For Internal Use)

ConnectionPool

Manage a limited number pool of TCPSocket connections to different addresses. Default number of open connections allowed is 512.

source
ConnectionPool(limit::Integer=50) -> ConnectionPool

Return a new ConnectionPool which limits the total possible number of connections open to limit.

source
send_recv{T}(pool::ConnectionPool, address::Address, msg::Dict{String, T})

Send msg to address and wait for a response.

Returns

  • Union{String, Array, Dict}: the reply received from address

source
get_comm(pool::ConnectionPool, address::Address)

Get a TCPSocket connection to the given address.

source
reuse(pool::ConnectionPool, address::Address, comm::TCPSocket)

Reuse an open communication to the given address.

source
collect_comms(pool::ConnectionPool)

Collect open but unused communications to allow opening other ones.

source
Base.closeMethod.
Base.close(pool::ConnectionPool)

Close all communications.

source

BatchedSend

(For Internal Use)

BatchedSend

Batch messages in batches on a stream. Batching several messages at once helps performance when sending a myriad of tiny messages. Used by both the julia worker and client to communicate with the scheduler.

source
BatchedSend(comm::TCPSocket; interval::AbstractFloat=0.002) -> BatchedSend

Batch messages in batches on comm. We send lists of messages every interval milliseconds.

source
background_send(batchedsend::BatchedSend)

Send the messages in batchsend.buffer every interval milliseconds.

source
send_msg{T}(batchedsend::BatchedSend, msg::Dict{String, T})

Schedule a message for sending to the other side. This completes quickly and synchronously.

source
Base.closeMethod.
Base.close(batchedsend::BatchedSend)

Try to send all remaining messages and then close the connection.

source