Ravn::

SocketBroker class

Constants

MAX_READ_SIZE

Maximum number of bytes to read in a single operation

Attributes

interval R

The IO interval (in floating-point seconds)

io_timer R

The Concurrent::TimerTask that executes the IO loop on an interval

read_buffer R

The device’s read buffer

socket R

The socket the broker is managing

write_buffer R

The device’s write buffer

Public Class Methods

new( socket, interval=0.5 )

Create a new SocketBroker that will manage IO for the given socket, encoding and decoding raw data using TNetstring, and telling its parent about any data read. The IO loop runs at most every interval seconds.

# File lib/ravn/socket_broker.rb, line 42
def initialize( socket, interval=0.5 )
        super()

        @socket        = socket
        @interval      = interval

        @read_buffer   = String.new( '', encoding: Encoding::ASCII_8BIT )
        @write_buffer  = String.new( '', encoding: Encoding::ASCII_8BIT )

        @io_timer      = self.make_io_timer

        # These actions are only called if this instance is being
        # spawned, rather than being instantianted directly via #new.
        self.start if Concurrent::Actor.current
end

Public Instance Methods

default_executor()

Use the IO-bound thread pool instead of the default

# File lib/ravn/socket_broker.rb, line 216
def default_executor
        Concurrent.global_io_executor
end
handle_io( * )

Do a nonblocking read and write from the socket.

# File lib/ravn/socket_broker.rb, line 122
def handle_io( * )
        readable = writeable = err = nil
        fdset = [self.socket]

        if self.write_buffer.empty?
                readable, writable, err = IO.select(fdset, [], fdset, 0 )
        else
                readable, writable, err = IO.select(fdset, fdset, fdset, 0 )
        end

        self.log.debug "Readable: %p, writable: %p, err: %p" % [ readable, writable, err ]

        if readable && !readable.empty?
                self.log.debug "Socket was ready for reading (%p is not nil or empty?)." % [ readable ]
                self.read_from_socket
        end

        if writable && !writable.empty?
                self.log.debug "Socket was ready for writing (%p is not nil or empty?)." % [ writable ]
                self.write_to_socket
        end

        self.publish_messages_from_read_buffer
end
make_io_timer()

Create a return a Concurrent::TimerTask that will execute the IO loop every

interval seconds.

# File lib/ravn/socket_broker.rb, line 206
def make_io_timer
        self.log.info "Creating IO timer with interval: %0.3f" % [ self.interval ]
        timer = Concurrent::TimerTask.new( execution_interval: self.interval, &self.method(:handle_io) )
        timer.add_observer( Ravn::LoggingTaskObserver.new(:io_timer) )

        return timer
end
on_event( event )

Actor API – handle actor lifecycle events.

# File lib/ravn/socket_broker.rb, line 108
def on_event( event )
        self.log.debug "Got lifecycle event: %p" % [ event ]
        event = event.first if event.is_a?( Array )

        case event
        when :terminated, :reset
                self.stop
        end

        super
end
on_message( message )

Actor API – handle incoming +message+s.

# File lib/ravn/socket_broker.rb, line 100
def on_message( message )
        self.log.debug "Appending message %p to the write buffer." % [ message ]
        data = TNetstring.dump( message )
        self.write_buffer << data
end
publish_messages_from_read_buffer()

Try to decode any data in the read buffer, and publish any resulting objects to the parent.

# File lib/ravn/socket_broker.rb, line 180
def publish_messages_from_read_buffer
        until self.read_buffer.empty?
                begin
                        object, remainder = TNetstring.parse( self.read_buffer )
                        self.read_buffer.replace( remainder )
                rescue TNetstring::ProcessError => err
                        # No-op
                        self.log.debug do
                                "%p while parsing tnetstring: probably partial object in buffer: %s" %
                                        [ err.class, err.message ]
                        end
                end

                if object
                        self.log.debug "Publishing incoming message %p to parent actor: %p" %
                                [ object, self.parent ]
                        self.parent.tell( incoming_message: object )
                else
                        return
                end
        end
end
read_from_socket()

Read any pending data from the read socket and append it to the read buffer.

# File lib/ravn/socket_broker.rb, line 149
def read_from_socket
        self.read_buffer << self.socket.read_nonblock( MAX_READ_SIZE )

rescue IO::WaitReadable
        self.log.debug "read would block; skipping"
rescue EOFError, Errno::ECONNRESET
        self.log.warn "Socket has closed. Exiting."
        self.tell( :terminate! )
rescue => err
        self.log.error "Unhandled %p when reading: %s" % [ err.class, err.message ]
        self.tell( :terminate! )
end
start()

Start the broker thread.

# File lib/ravn/socket_broker.rb, line 86
def start
        self.log.info "Starting up."
        self.io_timer.execute
end
stop()

Stop the broker.

# File lib/ravn/socket_broker.rb, line 93
def stop
        self.log.info "Shutting down."
        self.io_timer.shutdown
end
write_to_socket()

Write any pending data in the write buffer to the socket.

# File lib/ravn/socket_broker.rb, line 164
def write_to_socket
        bytes = self.socket.write_nonblock( self.write_buffer )
        self.write_buffer.slice!( 0, bytes )
rescue IO::WaitWritable, Errno::EINTR
        self.log.debug "write would block; skipping"
rescue Errno::ENOTCONN
        self.log.warn "Socket has closed. Exiting."
        self.tell( :terminate! )
rescue => err
        self.log.error "Unhandled %p when writing: %s" % [ err.class, err.message ]
        self.tell( :terminate! )
end