Ravn::HAL::Device::

ZmqSocket class

A ZeroMQ socket that speaks the Display protocol.

Constants

PHONE_EVENT_FILTER

A specific list of events that are to be sent to the temporary phone proxy.

Attributes

gps_positions R

The hash of GPS position events to send coalesced on a timer.

gps_sender R

The TimerTask that is sending aggregated GPS data on an interval.

reactor R

The IO reactor that manages the two sockets

recv_monitor R

The monitor that observes the receiving socket

recv_socket R

The socket that receives incoming messages

send_monitor R

The monitor that observes the sending socket

send_socket R

The socket which gets sent outgoing messages

write_queue R

The queue of pending data to be written.

Public Class Methods

new( * )

Create a ZmqSocket device.

# File lib/ravn/hal/device/zmq_socket.rb, line 47
def initialize( * )
        @reactor        = CZTop::Reactor.new
        @reactor_thread = nil

        @recv_socket    = nil
        @recv_monitor   = nil
        @send_socket    = nil
        @send_monitor   = nil

        @write_queue    = []
        @gps_positions  = {}

        @gps_sender = Concurrent::TimerTask.new { self.send_gps_messages }
        @gps_sender.execution_interval = self.class.gps_frequency
        @gps_sender.execute

        super
end

Public Instance Methods

disable_writing()

Stop polling for writability.

# File lib/ravn/hal/device/zmq_socket.rb, line 372
def disable_writing
        self.reactor.disable_events( self.send_socket, :write ) if self.send_socket_write_enabled?
end
dispatch_received_message( message )

Dispatch an incoming message created by deserializing the given data.

# File lib/ravn/hal/device/zmq_socket.rb, line 186
def dispatch_received_message( message )
        data = message.pop

        if ( match = %r|\Atcp://\d+\.\d+\.\d+\.\d+:\d+|.match(data) )
                send_endpoint = match[0]

                if self.send_socket
                        self.log.info "Got a PUSH endpoint; reconnecting to %s" % [ send_endpoint ]
                        self.reconnect( send_endpoint )
                else
                        self.log.info "Got a PUSH endpoint; establishing connection to %s" % [ send_endpoint ]
                        self.setup_send_socket( send_endpoint )
                        self.setup_send_monitor
                end
                self.send_startup_message

        else
                parsed_data = JSON.parse( data, symbolize_names: true )
                type = parsed_data.delete( :type ) or
                        raise "malformed message %p; no type specified" % [ parsed_data ]

                message = Ravn::HAL::Message.new( type, parsed_data )

                self.log.info "Got a message from remote ZMQ socket: %p" % [ message ]
                self.filter_up( message )
        end
end
enable_writing()

Start polling for writability if not already doing so.

# File lib/ravn/hal/device/zmq_socket.rb, line 366
def enable_writing
        self.reactor.enable_events( self.send_socket, :write ) unless self.send_socket_write_enabled?
end
on_recv_monitor_event( event )

Reactor callback – handle a monitor event about the receiving socket.

# File lib/ravn/hal/device/zmq_socket.rb, line 284
def on_recv_monitor_event( event )
        if event.readable?
                msg = event.socket.receive
                type, *payload = *msg

                self.log.info "Recv socket: %s (%p)" % [ type, payload ]
        else
                raise "Monitor event expected to be readable"
        end
end
on_recv_socket_event( event )

Reactor callback – handle an IO event on the receiving socket.

# File lib/ravn/hal/device/zmq_socket.rb, line 271
def on_recv_socket_event( event )
        if event.readable?
                msg = event.socket.receive
                self.dispatch_received_message( msg )
        elsif event.writable?
                raise "Request socket became writable?!"
        else
                raise "Socket event was neither readable nor writable! (%s)" % [ event ]
        end
end
on_send_monitor_event( event )

Reactor callback – handle a monitor event about the sending socket.

# File lib/ravn/hal/device/zmq_socket.rb, line 392
def on_send_monitor_event( event )
        if event.readable?
                msg = event.socket.receive
                type, *payload = *msg

                self.log.info "Send socket: %s (%p)" % [ type, payload ]
                case type
                when 'DISCONNECTED'
                        self.log.warn "ZMQ socket client disconnected; resetting the send socket."
                        self.stop_send_monitor
                        self.shutdown_send_socket
                end
        else
                raise "Monitor event expected to be readable"
        end
end
on_send_socket_event( event )

Reactor callback — handle an IO event on the sending socket.

# File lib/ravn/hal/device/zmq_socket.rb, line 378
def on_send_socket_event( event )
        if event.writable?
                if ( data = self.write_queue.shift )
                        data.send_to( self.send_socket )
                end

                self.disable_writing if self.write_queue.empty?
        else
                raise "Unexpected %p from the send socket" % [ event ]
        end
end
reconnect( new_endpoint )

Disconnect an existing send socket and connect to the new_endpoint.

# File lib/ravn/hal/device/zmq_socket.rb, line 343
def reconnect( new_endpoint )
        self.stop_send_monitor
        self.shutdown_send_socket
        self.setup_send_socket( new_endpoint )
        self.setup_send_monitor
end
relay_message( message )
# File lib/ravn/hal/device/zmq_socket.rb, line 136
def relay_message( message )
        self.log.debug "Sending a %p message to the socket" % [ message.type ]

        data = message.fields.merge( type: message.type )
        data[:label] = message.callsign if message.respond_to?( :callsign )

        encoded_data = JSON.generate( data )
        self.log.debug "Sending JSON: %s" % [ encoded_data ]

        self.write( encoded_data )
end
relay_timed_message( message )
# File lib/ravn/hal/device/zmq_socket.rb, line 151
def relay_timed_message( message )
        self.log.debug "Caching a %p message for the socket" % [ message.type ]
        self.gps_positions[ message.callsign ] = message.pos
end
send_gps_messages()

Send all current GPS network events to the socket.

# File lib/ravn/hal/device/zmq_socket.rb, line 158
def send_gps_messages
        return if self.gps_positions.empty?
        data = {
                id: Ravn.uuid.generate,
                time: Ravn.time,
                type: 'net.gps.positions',
                data: self.gps_positions
        }
        encoded_data = JSON.generate( data )
        self.gps_positions.clear

        self.log.debug "Sending batched GPS JSON: %s" % [ encoded_data ]
        self.write( encoded_data )
end
send_socket_registered?()

Returns true if there is a send socket set up and registered with the reactor.

# File lib/ravn/hal/device/zmq_socket.rb, line 352
def send_socket_registered?
        return self.send_socket &&
                self.reactor.registered?( self.send_socket )
end
send_socket_write_enabled?()

Returns true if the runner’s client socket is currently registered for writing.

# File lib/ravn/hal/device/zmq_socket.rb, line 359
def send_socket_write_enabled?
        return self.send_socket_registered? &&
                self.reactor.event_enabled?( self.send_socket, :write )
end
send_startup_message()

Emit the last seen mission controller startup message to the connecting client.

# File lib/ravn/hal/device/zmq_socket.rb, line 412
def send_startup_message
        return unless self.startup_message
        message = self.startup_message.fields.merge( type: self.startup_message.type )
        message = JSON.generate( message )
        self.log.debug "Sending startup message: %s" % [ message ]
        self.write( message )
end
setup_recv_monitor()

Set up the monitor for the receiving socket.

# File lib/ravn/hal/device/zmq_socket.rb, line 243
def setup_recv_monitor
        @recv_monitor = self.reactor.register_monitor(
                @recv_socket, :CONNECTED, :DISCONNECTED, &self.method(:on_recv_monitor_event) )
end
setup_recv_socket()

Set up the receiving socket.

# File lib/ravn/hal/device/zmq_socket.rb, line 230
def setup_recv_socket
        recv_endpoint = self.class.endpoint
        self.log.info "Binding receive socket to %s" % [ recv_endpoint ]

        @recv_socket = CZTop::Socket::PULL.new
        @recv_socket.options.linger = 0
        @recv_socket.bind( recv_endpoint )

        self.reactor.register( @recv_socket, :read, &self.method(:on_recv_socket_event) )
end
setup_send_monitor()

Set up the monitor for the sending socket.

# File lib/ravn/hal/device/zmq_socket.rb, line 309
def setup_send_monitor
        @send_monitor = self.reactor.register_monitor(
                @send_socket, :CONNECTED, :DISCONNECTED, &self.method(:on_send_monitor_event) )
end
setup_send_socket( endpoint )

Set up the send socket on the specified endpoint.

# File lib/ravn/hal/device/zmq_socket.rb, line 297
def setup_send_socket( endpoint )
        self.log.info "Connecting send socket to %s" % [ endpoint ]

        @send_socket = CZTop::Socket::PUSH.new
        @send_socket.options.linger = 0
        @send_socket.connect( endpoint )

        self.reactor.register( @send_socket, :write, &self.method(:on_send_socket_event) )
end
shutdown_recv_socket()

Shut down the receiving socket.

# File lib/ravn/hal/device/zmq_socket.rb, line 250
def shutdown_recv_socket
        if @recv_socket
                self.log.info "Shutting down the recv socket."
                @recv_socket.options.linger = 0
                @recv_socket.unbind( @recv_socket.last_endpoint ) if @recv_socket&.last_endpoint
                @recv_socket = nil
        end
end
shutdown_send_socket()

Shut down the sending socket.

# File lib/ravn/hal/device/zmq_socket.rb, line 316
def shutdown_send_socket
        if @send_socket
                self.log.info "Shutting down the send socket."
                self.reactor.unregister( @send_socket )

                @send_socket.options.linger = 0
                @send_socket.disconnect( @send_socket.last_endpoint ) if @send_socket.last_endpoint
                @send_socket.close
        else
                self.log.info "Send socket not connected; skipping teardown."
        end

        @send_socket = nil
end
shutdown_sockets()

Shut down the sockets used by the device.

# File lib/ravn/hal/device/zmq_socket.rb, line 216
def shutdown_sockets
        self.shutdown_send_socket
        self.shutdown_recv_socket
end
start()

Start the socket device.

# File lib/ravn/hal/device/zmq_socket.rb, line 105
def start
        CZMQ::FFI::Signals.disable_default_handling

        self.log.info "Starting the socket device"
        self.setup_recv_socket
        self.setup_recv_monitor

        # 100.times do |i|
        #     self.write_queue << CZTop::Message.new( %|{"id":%d, "type":"sys.stresstest"}| % [i] )
        # end

        self.log.info "Starting poll loop."
        @reactor_thread = Thread.new do
                Thread.current.name = "ZMQ Socket Device I/O"
                self.reactor.start_polling( ignore_interrupts: true )
        end
end
stop()

Stop the socket device.

# File lib/ravn/hal/device/zmq_socket.rb, line 125
def stop
        self.log.info "Stopping the socket device."
        self.reactor.stop_polling
        self.stop_monitors
        self.gps_sender.shutdown
        self.shutdown_sockets
end
stop_monitors()

Shut down the monitors for the device’s sockets.

# File lib/ravn/hal/device/zmq_socket.rb, line 223
def stop_monitors
        self.stop_send_monitor
        self.stop_recv_monitor
end
stop_recv_monitor()

Stop monitoring the receiving socket.

# File lib/ravn/hal/device/zmq_socket.rb, line 261
def stop_recv_monitor
        if @recv_monitor
                self.log.info "Stopping the recv socket monitor."
                self.reactor.unregister( @recv_monitor )
                @recv_monitor.terminate
        end
end
stop_send_monitor()

Stop monitoring the sending socket.

# File lib/ravn/hal/device/zmq_socket.rb, line 333
def stop_send_monitor
        if @send_monitor
                self.log.info "Stopping the send socket monitor."
                self.reactor.unregister( @send_monitor )
                @send_monitor.terminate
        end
end
write( data )

Queue up the given data for writing.

# File lib/ravn/hal/device/zmq_socket.rb, line 175
def write( data )
        if self.send_socket_registered?
                self.write_queue << CZTop::Message.new( data )
                self.enable_writing
        else
                self.log.debug "No sender yet; discarding write."
        end
end