Ravn::HAL::
ZmqSocketClient
class
A client for interacting with the ZMQ socket device in the BDE.
- endpoint R
The ZeroMQ endpoint to connect to
- on_message_callback R
The call-able that is called for every incoming message from the BDE
- reactor R
The IO reactor used to manage the socket pair
- recv_queue R
The queue of messages that have arrived from the BDE device
- recv_socket R
The PULL socket used to read messages fro the remote BDE
- send_queue R
The queue of messages to send to the BDE device
- send_socket R
The PUSH socket used to send messages to the remote BDE
- thread R
The Thread object responsible for the client’s IO
new( endpoint=Ravn::HAL::Device::ZmqSocket.endpoint, &callback )
Create a new client that will talk to the ZMQ socket device at endpoint
.
def initialize( endpoint=Ravn::HAL::Device::ZmqSocket.endpoint, &callback )
@endpoint = endpoint
@reactor = CZTop::Reactor.new
@send_socket = CZTop::Socket::PUSH.new
@recv_socket = CZTop::Socket::PULL.new
@on_message_callback = callback
@send_queue = []
@recv_queue = []
@thread = nil
end
call_message_callback( encoded_data )
Display a message received from the BDE.
def call_message_callback( encoded_data )
fields = JSON.parse( encoded_data, symbolize_names: true )
message = OpenStruct.new( fields )
if ( callback = self.on_message_callback )
callback.call( message )
else
self.recv_queue << message
end
end
Connect to the BDE ZMQ socket device
def connect
recv_endpoint = self.setup_recv_socket
self.connect_send_socket( recv_endpoint )
end
connect_send_socket( recv_endpoint )
Set up the sending socket by connecting to the zmq BDE socket.
def connect_send_socket( recv_endpoint )
self.log.info "Connecting to %s" % [ self.endpoint ]
self.send_socket.options.linger = 0
self.send_socket.connect( self.endpoint )
self.log.info "Connected."
self.reactor.register( self.send_socket, :write, &self.method(:on_send_socket_event) )
self.enable_writing
self.send_message( recv_endpoint )
end
Stop polling for writability.
def disable_writing
self.reactor.disable_events( self.send_socket, :write )
end
Start polling for writability if not already doing so.
def enable_writing
self.reactor.enable_events( self.send_socket, :write )
end
Set the callback that is called when an incoming message arrives.
def on_message( &callback )
@on_message_callback = callback
end
on_recv_socket_event( event )
Handle the receive socket becoming readable.
def on_recv_socket_event( event )
if event.readable?
message = event.socket.receive
self.call_message_callback( message.pop )
elsif event.writable?
raise "Request socket became writable?!"
else
raise "Socket event was neither readable nor writable! (%s)" % [ event ]
end
end
on_send_socket_event( event )
Reactor callback — handle an IO event on the sending socket.
def on_send_socket_event( event )
if event.writable?
if ( message = self.send_queue.shift )
message.send_to( self.send_socket )
end
self.disable_writing if self.send_queue.empty?
else
raise "Unexpected %p from the send socket" % [ event ]
end
end
Encode the given message
and send it to the BDE device.
def send_message( message )
data = if message.respond_to?( :to_h )
JSON.generate( message.to_h )
else
message
end
cztop_message = CZTop::Message.new( data )
self.send_queue.push( cztop_message )
self.reactor.enable_events( self.send_socket, :write )
end
Set up the receiving socket and return the endpoint it’s bound to.
def setup_recv_socket
recv_addr = Zyre.interfaces.first[1][:address]
recv_endpoint = "tcp://%s:*" % [ recv_addr ]
self.log.info "Setting up the receive (PULL) socket: %s" % [ recv_endpoint ]
self.recv_socket.options.linger = 0
self.recv_socket.bind( recv_endpoint )
self.reactor.register( self.recv_socket, :read, &self.method(:on_recv_socket_event) )
return self.recv_socket.last_endpoint
end
Connect the client’s sockets and start sending/receiving messages.
def start
self.log.info "Starting ZMQ socket client for: %s" % [ self.endpoint ]
self.connect
@thread = Thread.new do
Thread.current.abort_on_exception = true
Thread.current.name = "zmq socket client I/O"
self.reactor.start_polling
end
end
Stop the client’s IO thread.
def stop
self.log.info "Stopping ZMQ socket client."
if self.thread
self.reactor.stop_polling
self.recv_socket&.close
self.send_socket&.close
self.thread.join
end
end