Ravn::BDE::
NetBroker
class
The actor wrapper around the Ravn::Net::Broker
used for net-side messaging.
Emits: - sys.peers.info - sys.network.info
Consumes: - bolt.sent.* - bolt.responded.* - bolt.finished.* - direct.bolt.* - sys.gps.position
:todo: Rename this to NetManager to avoid confusion between it and Net::Broker.
- DEFAULT_HAE
Default HAE (height above ellipsoid, in meters) before the GPS starts sending updates.
- DEFAULT_POSITION
Default latitude and longitude before the GPS starts sending updates.
- RELAYED_BDE_MESSAGES
The kinds of BDE messages that get relayed to the network
- broker R
The Ravn::Net::Broker
the actor is managing.
- current_hae RW
The unit’s current HAE, updated via sys.gps
messages from the HAL. This is used to add elevation data to outbound events.
- current_position R
The unit’s current position, updated via sys.gps
messages from the HAL. This is used to add positioning data to outbound events.
- network_info_timer R
The timer that relays changing network state and logs peers
new( node_name=nil, **options )
Create and start a Ravn::Net::Broker
with the given node_name
and options.
def initialize( node_name=nil, **options )
@broker = Ravn::Net::Broker.new( node_name, **options ) do |zyre|
zyre.verbose! if $VERBOSE
end
@current_position = DEFAULT_POSITION.dup
@current_hae = DEFAULT_HAE
@network_info_timer = self.setup_network_info_timer
super
end
broadcast_position( gps_message )
def broadcast_position( gps_message )
self.log.debug "Recasting a `%s` event as `net.gps.position`" % [ gps_message.type ]
net_message = Ravn::Net::Message.recast( gps_message, 'net.gps.position' )
self.broker.async.send_message( net_message )
self.current_position.replace( gps_message.data[:pos] )
self.current_hae = gps_message.data[:hae]
self.log.debug "Updated current position for messages: [%02.f, %0.2f] @%0.2fm." %
[ *self.current_position, self.current_hae ]
end
Use the IO execution thread pool to avoid deadlocking non-io threads
def default_executor
return Concurrent.global_io_executor
end
Message handler for peer map requests.
def handle_peer_map( * )
return self.peers_map
end
handle_restarting_event( * )
Handle being restarted by shutting down the timer that logs peer connections.
def handle_restarting_event( * )
self.log.error "Restarting; shutting down the peer logging timer"
self.network_info_timer.shutdown
end
Return the details of inspect output.
def inspect_details
base = super
return "%s node: %s, %d peers" % [
base,
self.broker.inspect_details,
self.broker.node.peers.length
]
end
Log which peers we can see.
def log_zyre_peers
signs = self.broker.peer_uuids.invert
signwidth = signs.values.map( &:size ).max || 0
output = self.peers_map.map do |peer_uuid, info|
callsign = "{%s}" % [ signs[peer_uuid] ]
headers = info[:headers]
details = if headers
"v%<ravn_net_version>s (Protocol %<helios_protocol_version>d) @ %<started_at>s" %
headers
else
"~~no headers~~"
end
" %s %#{signwidth + 2}s <- %-27s: %s" % [
peer_uuid,
callsign,
info[:address],
details,
]
end
if output.empty?
self.log.warn "No mesh peers connected."
else
self.log.info "Mesh peers are:\n%s" % [ output.join("\n") ]
end
rescue => err
self.log.error "%p while making peer log: %s" % [ err.class, err.message ]
self.log.debug( err.full_message(order: :bottom) )
end
Return a Hash of peer UUIDs to information about the corresponding Zyre peer.
def peers_map
node = self.broker.node
return node.peers.each_with_object( {} ) do |peer_uuid, info|
info[ peer_uuid ] = {
headers: self.broker.peer_headers[ peer_uuid ],
address: node.peer_address( peer_uuid )
}
end
end
send_helios_message( bolt_message )
def send_helios_message( bolt_message )
self.log.debug "Sending a %p message to the broker" % [ bolt_message.type ]
self.log.debug " message: %p" % [ bolt_message ]
message = Ravn::Net::Message.recast( bolt_message, bolt_message.type )
message.pos ||= self.current_position
message.hae ||= self.current_hae
self.broker.async.send_message( message )
end
Send a message if the network interface state changes.
def send_network_info
@last_link_state ||= {}
current_state = self.broker.interface_state
unless @last_link_state[ :link ] == current_state[ :link ]
self.log.debug "Link change detected: Sending network state event"
msg = Ravn::BDE::Message.new( 'sys.network.info', data: current_state )
self.filter_up( msg )
end
@last_link_state = current_state
end
Send a message to displays describing how many peers they’re connected to.
def send_peer_info
self.log.debug "Sending peer info event"
count = self.broker.node.peers.length
msg = Ravn::BDE::Message.new( 'sys.peers.info', data: {count: count} )
self.filter_up( msg )
end
setup_network_info_timer()
Create the timer for the peer logger.
def setup_network_info_timer
self.log.info "Starting network information timer"
timer = Concurrent::TimerTask.new do
self.send_network_info
self.log_zyre_peers
self.send_peer_info
end
timer.execution_interval = 5
timer.add_observer( Ravn::LoggingTaskObserver.new(:network_info_timer) )
return timer
end
Start the broker on Actor startup once its interface is ready.
def start
super
self.broker.on_any_event( &self.method(:tell) )
self.broker.start
self.network_info_timer.execute
end