Ravn::HAL::

ZmqSocketClient class

A client for interacting with the ZMQ socket device in the BDE.

Attributes

endpoint R

The ZeroMQ endpoint to connect to

on_message_callback R

The call-able that is called for every incoming message from the BDE

reactor R

The IO reactor used to manage the socket pair

recv_queue R

The queue of messages that have arrived from the BDE device

recv_socket R

The PULL socket used to read messages fro the remote BDE

send_queue R

The queue of messages to send to the BDE device

send_socket R

The PUSH socket used to send messages to the remote BDE

thread R

The Thread object responsible for the client’s IO

Public Class Methods

new( endpoint=Ravn::HAL::Device::ZmqSocket.endpoint, &callback )

Create a new client that will talk to the ZMQ socket device at endpoint.

# File lib/ravn/hal/zmq_socket_client.rb, line 20
def initialize( endpoint=Ravn::HAL::Device::ZmqSocket.endpoint, &callback )
        @endpoint            = endpoint

        @reactor             = CZTop::Reactor.new
        @send_socket         = CZTop::Socket::PUSH.new
        @recv_socket         = CZTop::Socket::PULL.new

        @on_message_callback = callback
        @send_queue          = []
        @recv_queue          = []
        @thread              = nil
end

Public Instance Methods

call_message_callback( encoded_data )

Display a message received from the BDE.

# File lib/ravn/hal/zmq_socket_client.rb, line 194
def call_message_callback( encoded_data )
        fields = JSON.parse( encoded_data, symbolize_names: true )
        message = OpenStruct.new( fields )

        if ( callback = self.on_message_callback )
                callback.call( message )
        else
                self.recv_queue << message
        end
end
connect()

Connect to the BDE ZMQ socket device

# File lib/ravn/hal/zmq_socket_client.rb, line 102
def connect
        recv_endpoint = self.setup_recv_socket
        self.connect_send_socket( recv_endpoint )
end
connect_send_socket( recv_endpoint )

Set up the sending socket by connecting to the zmq BDE socket.

# File lib/ravn/hal/zmq_socket_client.rb, line 153
def connect_send_socket( recv_endpoint )
        self.log.info "Connecting to %s" % [ self.endpoint ]

        self.send_socket.options.linger = 0
        self.send_socket.connect( self.endpoint )
        self.log.info "Connected."

        self.reactor.register( self.send_socket, :write, &self.method(:on_send_socket_event) )
        self.enable_writing

        self.send_message( recv_endpoint )
end
disable_writing()

Stop polling for writability.

# File lib/ravn/hal/zmq_socket_client.rb, line 174
def disable_writing
        self.reactor.disable_events( self.send_socket, :write )
end
enable_writing()

Start polling for writability if not already doing so.

# File lib/ravn/hal/zmq_socket_client.rb, line 168
def enable_writing
        self.reactor.enable_events( self.send_socket, :write )
end
on_message( &callback )

Set the callback that is called when an incoming message arrives.

# File lib/ravn/hal/zmq_socket_client.rb, line 72
def on_message( &callback )
        @on_message_callback = callback
end
on_recv_socket_event( event )

Handle the receive socket becoming readable.

# File lib/ravn/hal/zmq_socket_client.rb, line 124
def on_recv_socket_event( event )
        if event.readable?
                message = event.socket.receive
                self.call_message_callback( message.pop )

        elsif event.writable?
                raise "Request socket became writable?!"

        else
                raise "Socket event was neither readable nor writable! (%s)" % [ event ]
        end
end
on_send_socket_event( event )

Reactor callback — handle an IO event on the sending socket.

# File lib/ravn/hal/zmq_socket_client.rb, line 139
def on_send_socket_event( event )
        if event.writable?
                if ( message = self.send_queue.shift )
                        message.send_to( self.send_socket )
                end

                self.disable_writing if self.send_queue.empty?
        else
                raise "Unexpected %p from the send socket" % [ event ]
        end
end
send_message( message )

Encode the given message and send it to the BDE device.

# File lib/ravn/hal/zmq_socket_client.rb, line 180
def send_message( message )
        data = if message.respond_to?( :to_h )
                        JSON.generate( message.to_h )
                else
                        message
                end

        cztop_message = CZTop::Message.new( data )
        self.send_queue.push( cztop_message )
        self.reactor.enable_events( self.send_socket, :write )
end
setup_recv_socket()

Set up the receiving socket and return the endpoint it’s bound to.

# File lib/ravn/hal/zmq_socket_client.rb, line 109
def setup_recv_socket
        recv_addr = Zyre.interfaces.first[1][:address]
        recv_endpoint = "tcp://%s:*" % [ recv_addr ]

        self.log.info "Setting up the receive (PULL) socket: %s" % [ recv_endpoint ]
        self.recv_socket.options.linger = 0
        self.recv_socket.bind( recv_endpoint )

        self.reactor.register( self.recv_socket, :read, &self.method(:on_recv_socket_event) )

        return self.recv_socket.last_endpoint
end
start()

Connect the client’s sockets and start sending/receiving messages.

# File lib/ravn/hal/zmq_socket_client.rb, line 78
def start
        self.log.info "Starting ZMQ socket client for: %s" % [ self.endpoint ]
        self.connect
        @thread = Thread.new do
                Thread.current.abort_on_exception = true
                Thread.current.name = "zmq socket client I/O"
                self.reactor.start_polling
        end
end
stop()

Stop the client’s IO thread.

# File lib/ravn/hal/zmq_socket_client.rb, line 90
def stop
        self.log.info "Stopping ZMQ socket client."
        if self.thread
                self.reactor.stop_polling
                self.recv_socket&.close
                self.send_socket&.close
                self.thread.join
        end
end