Ravn::HAL::Device::
ZmqSocket
class
A ZeroMQ socket that speaks the Display protocol.
- PHONE_EVENT_FILTER
A specific list of events that are to be sent to the temporary phone proxy.
- gps_positions R
The hash of GPS position events to send coalesced on a timer.
- gps_sender R
The TimerTask that is sending aggregated GPS data on an interval.
- reactor R
The IO reactor that manages the two sockets
- recv_monitor R
The monitor that observes the receiving socket
- recv_socket R
The socket that receives incoming messages
- send_monitor R
The monitor that observes the sending socket
- send_socket R
The socket which gets sent outgoing messages
- write_queue R
The queue of pending data to be written.
Create a ZmqSocket
device.
def initialize( * )
@reactor = CZTop::Reactor.new
@reactor_thread = nil
@recv_socket = nil
@recv_monitor = nil
@send_socket = nil
@send_monitor = nil
@write_queue = []
@gps_positions = {}
@gps_sender = Concurrent::TimerTask.new { self.send_gps_messages }
@gps_sender.execution_interval = self.class.gps_frequency
@gps_sender.execute
super
end
Stop polling for writability.
def disable_writing
self.reactor.disable_events( self.send_socket, :write ) if self.send_socket_write_enabled?
end
dispatch_received_message( message )
Dispatch an incoming message created by deserializing the given data
.
def dispatch_received_message( message )
data = message.pop
if ( match = %r|\Atcp://\d+\.\d+\.\d+\.\d+:\d+|.match(data) )
send_endpoint = match[0]
if self.send_socket
self.log.info "Got a PUSH endpoint; reconnecting to %s" % [ send_endpoint ]
self.reconnect( send_endpoint )
else
self.log.info "Got a PUSH endpoint; establishing connection to %s" % [ send_endpoint ]
self.setup_send_socket( send_endpoint )
self.setup_send_monitor
end
self.send_startup_message
else
parsed_data = JSON.parse( data, symbolize_names: true )
type = parsed_data.delete( :type ) or
raise "malformed message %p; no type specified" % [ parsed_data ]
message = Ravn::HAL::Message.new( type, parsed_data )
self.log.info "Got a message from remote ZMQ socket: %p" % [ message ]
self.filter_up( message )
end
end
Start polling for writability if not already doing so.
def enable_writing
self.reactor.enable_events( self.send_socket, :write ) unless self.send_socket_write_enabled?
end
on_recv_monitor_event( event )
Reactor callback – handle a monitor event about the receiving socket.
def on_recv_monitor_event( event )
if event.readable?
msg = event.socket.receive
type, *payload = *msg
self.log.info "Recv socket: %s (%p)" % [ type, payload ]
else
raise "Monitor event expected to be readable"
end
end
on_recv_socket_event( event )
Reactor callback – handle an IO event on the receiving socket.
def on_recv_socket_event( event )
if event.readable?
msg = event.socket.receive
self.dispatch_received_message( msg )
elsif event.writable?
raise "Request socket became writable?!"
else
raise "Socket event was neither readable nor writable! (%s)" % [ event ]
end
end
on_send_monitor_event( event )
Reactor callback – handle a monitor event about the sending socket.
def on_send_monitor_event( event )
if event.readable?
msg = event.socket.receive
type, *payload = *msg
self.log.info "Send socket: %s (%p)" % [ type, payload ]
case type
when 'DISCONNECTED'
self.log.warn "ZMQ socket client disconnected; resetting the send socket."
self.stop_send_monitor
self.shutdown_send_socket
end
else
raise "Monitor event expected to be readable"
end
end
on_send_socket_event( event )
Reactor callback — handle an IO event on the sending socket.
def on_send_socket_event( event )
if event.writable?
if ( data = self.write_queue.shift )
data.send_to( self.send_socket )
end
self.disable_writing if self.write_queue.empty?
else
raise "Unexpected %p from the send socket" % [ event ]
end
end
reconnect( new_endpoint )
Disconnect an existing send socket and connect to the new_endpoint
.
def reconnect( new_endpoint )
self.stop_send_monitor
self.shutdown_send_socket
self.setup_send_socket( new_endpoint )
self.setup_send_monitor
end
def relay_message( message )
self.log.debug "Sending a %p message to the socket" % [ message.type ]
data = message.fields.merge( type: message.type )
data[:label] = message.callsign if message.respond_to?( :callsign )
encoded_data = JSON.generate( data )
self.log.debug "Sending JSON: %s" % [ encoded_data ]
self.write( encoded_data )
end
relay_timed_message( message )
def relay_timed_message( message )
self.log.debug "Caching a %p message for the socket" % [ message.type ]
self.gps_positions[ message.callsign ] = message.pos
end
Send all current GPS network events to the socket.
def send_gps_messages
return if self.gps_positions.empty?
data = {
id: Ravn.uuid.generate,
time: Ravn.time,
type: 'net.gps.positions',
data: self.gps_positions
}
encoded_data = JSON.generate( data )
self.gps_positions.clear
self.log.debug "Sending batched GPS JSON: %s" % [ encoded_data ]
self.write( encoded_data )
end
send_socket_registered?()
Returns true
if there is a send socket set up and registered with the reactor.
def send_socket_registered?
return self.send_socket &&
self.reactor.registered?( self.send_socket )
end
send_socket_write_enabled?()
Returns true
if the runner’s client socket is currently registered for writing.
def send_socket_write_enabled?
return self.send_socket_registered? &&
self.reactor.event_enabled?( self.send_socket, :write )
end
Emit the last seen mission controller startup message to the connecting client.
def send_startup_message
return unless self.startup_message
message = self.startup_message.fields.merge( type: self.startup_message.type )
message = JSON.generate( message )
self.log.debug "Sending startup message: %s" % [ message ]
self.write( message )
end
Set up the monitor for the receiving socket.
def setup_recv_monitor
@recv_monitor = self.reactor.register_monitor(
@recv_socket, :CONNECTED, :DISCONNECTED, &self.method(:on_recv_monitor_event) )
end
Set up the receiving socket.
def setup_recv_socket
recv_endpoint = self.class.endpoint
self.log.info "Binding receive socket to %s" % [ recv_endpoint ]
@recv_socket = CZTop::Socket::PULL.new
@recv_socket.options.linger = 0
@recv_socket.bind( recv_endpoint )
self.reactor.register( @recv_socket, :read, &self.method(:on_recv_socket_event) )
end
Set up the monitor for the sending socket.
def setup_send_monitor
@send_monitor = self.reactor.register_monitor(
@send_socket, :CONNECTED, :DISCONNECTED, &self.method(:on_send_monitor_event) )
end
setup_send_socket( endpoint )
Set up the send socket on the specified endpoint
.
def setup_send_socket( endpoint )
self.log.info "Connecting send socket to %s" % [ endpoint ]
@send_socket = CZTop::Socket::PUSH.new
@send_socket.options.linger = 0
@send_socket.connect( endpoint )
self.reactor.register( @send_socket, :write, &self.method(:on_send_socket_event) )
end
Shut down the receiving socket.
def shutdown_recv_socket
if @recv_socket
self.log.info "Shutting down the recv socket."
@recv_socket.options.linger = 0
@recv_socket.unbind( @recv_socket.last_endpoint ) if @recv_socket&.last_endpoint
@recv_socket = nil
end
end
Shut down the sending socket.
def shutdown_send_socket
if @send_socket
self.log.info "Shutting down the send socket."
self.reactor.unregister( @send_socket )
@send_socket.options.linger = 0
@send_socket.disconnect( @send_socket.last_endpoint ) if @send_socket.last_endpoint
@send_socket.close
else
self.log.info "Send socket not connected; skipping teardown."
end
@send_socket = nil
end
Shut down the sockets used by the device.
def shutdown_sockets
self.shutdown_send_socket
self.shutdown_recv_socket
end
Start the socket device.
def start
CZMQ::FFI::Signals.disable_default_handling
self.log.info "Starting the socket device"
self.setup_recv_socket
self.setup_recv_monitor
self.log.info "Starting poll loop."
@reactor_thread = Thread.new do
Thread.current.name = "ZMQ Socket Device I/O"
self.reactor.start_polling( ignore_interrupts: true )
end
end
Stop the socket device.
def stop
self.log.info "Stopping the socket device."
self.reactor.stop_polling
self.stop_monitors
self.gps_sender.shutdown
self.shutdown_sockets
end
Shut down the monitors for the device’s sockets.
def stop_monitors
self.stop_send_monitor
self.stop_recv_monitor
end
Stop monitoring the receiving socket.
def stop_recv_monitor
if @recv_monitor
self.log.info "Stopping the recv socket monitor."
self.reactor.unregister( @recv_monitor )
@recv_monitor.terminate
end
end
Stop monitoring the sending socket.
def stop_send_monitor
if @send_monitor
self.log.info "Stopping the send socket monitor."
self.reactor.unregister( @send_monitor )
@send_monitor.terminate
end
end
Queue up the given data
for writing.
def write( data )
if self.send_socket_registered?
self.write_queue << CZTop::Message.new( data )
self.enable_writing
else
self.log.debug "No sender yet; discarding write."
end
end