Ravn::Tactical::

Discovery module

Functions for discovery of other compute packs on the LAN.

Refs: - api.zeromq.org/czmq3-0:zbeacon

Constants

BEACON_KEYS

The sorted list of discovery hash keys.

BEACON_PREFIX

The beacon “type”, for filtering

BROADCAST_INTERVAL

How often to broadcast, in seconds

DISCOVERY_DATA_TIMEOUT

How long since the last beacon was received from a node before its discovery data is expired.

EXPIRATION_INTERVAL

How often to check for expired discovery data

LOCAL_UPDATE_INTERVAL

How often to refresh local node data in the discovery table

POLL_TIMEOUT

How many seconds to wait for an incoming broadcast in the poll loop

PORT

Port to use for discovery

Public Class Methods

broadcast_data( overrides={} )

Return the String of data that is used for the broadcast.

    # File lib/ravn/tactical/discovery.rb
310 def self::broadcast_data( overrides={} )
311     data = self.node_data.merge( overrides )
312 
313     packed_data = MessagePack.pack( data.values )
314     encoded = BEACON_PREFIX + ' ' + [ packed_data ].pack( 'm0' )
315     self.log.debug "Encoded broadcast is %d bytes" % [ encoded.bytesize ]
316 
317     if encoded.bytesize > CZTop::Beacon::MAX_BEACON_DATA
318         raise "discovery broadcast is too large (%d bytes > %d)" % [
319             encoded.bytesize,
320             CZTop::Beacon::MAX_BEACON_DATA
321         ]
322     end
323 
324     return encoded
325 end
create_beacon()

Create and return a CZTop::Beacon that will be used to discover other nodes.

    # File lib/ravn/tactical/discovery.rb
134 def self::create_beacon
135     beacon = CZTop::Beacon.new
136     hostname = beacon.configure( PORT )
137 
138     beacon.verbose! if $DEBUG || $VERBOSE
139     beacon.subscribe( BEACON_PREFIX )
140 
141     self.log.debug "Publishing: %p" % [ self.broadcast_data ]
142     beacon.publish( self.broadcast_data, BROADCAST_INTERVAL * 1000 )
143 
144     self.log.info "  beacon started on %s:%d" % [ hostname, PORT ]
145 
146     return beacon, hostname
147 end
decode_broadcast_data( ip, encoded_data )

Decode the specified encoded_data, sanity-check it, and return it if it appears valid.

    # File lib/ravn/tactical/discovery.rb
330 def self::decode_broadcast_data( ip, encoded_data )
331     packed_data = encoded_data.unpack1( 'm0' ) or
332         raise ArgumentError, "invalid encoded data"
333     unpacked = MessagePack.unpack( packed_data )
334 
335     node_data = BEACON_KEYS.zip( unpacked ).to_h
336     node = self.make_merged_node( node_data )
337 
338     unless node.valid_for_discovery?
339         self.log.error "Ignoring broadcast from %s: %s" %
340             [ ip, node.errors.full_messages.join('; ') ]
341         return nil
342     end
343 
344     return node
345 rescue ArgumentError, MessagePack::MalformedFormatError => err
346     self.log.error "Malformed broadcast: %p: %s" % [ packed_data, err.message ]
347     return nil
348 end
expire_discovery_data( * )

Expire discovery data of nodes that haven’t beaconed in a while.

    # File lib/ravn/tactical/discovery.rb
272 def self::expire_discovery_data( * )
273     stale_time = Time.now - DISCOVERY_DATA_TIMEOUT
274     self.nodes.delete_if do |device_id, node|
275         if node.last_seen_at <= stale_time
276             self.log.debug "Expiring discovery data for %s" % [ device_id ]
277             true
278         end
279     end
280 end
handle_broadcasts()

Loop over the beacon, adding nodes for incoming broadcasts.

    # File lib/ravn/tactical/discovery.rb
196 def self::handle_broadcasts
197     Thread.current.name = "Discovery Broadcaster" unless Thread.current == Thread.main
198     self.log.info "Waiting for broadcasts."
199     poller = CZTop::Poller::ZPoller.new( self.beacon.actor )
200 
201     self.local_update_timer.execute
202     self.expiration_timer.execute
203     while self.beacon && !self.beacon.actor.dead?
204         self.read_node_broadcast if poller.wait( POLL_TIMEOUT * 1000 )
205     end
206     self.expiration_timer.shutdown
207     self.local_update_timer.shutdown
208 
209     self.log.info "Stopped waiting for broadcasts."
210 end
make_expiration_timer()

Return a Concurrent::TimerTask that will expire old discovery data.

    # File lib/ravn/tactical/discovery.rb
162 def self::make_expiration_timer
163     timer = Concurrent::TimerTask.new( &self.method(:expire_discovery_data) )
164     timer.execution_interval = EXPIRATION_INTERVAL
165     timer.add_observer( Ravn::LoggingTaskObserver.new(:discovery_expiration_timer) )
166 
167     return timer
168 end
make_local_update_timer()

Return a Concurrent::TimerTask that will update node discovery data for the local node.

    # File lib/ravn/tactical/discovery.rb
152 def self::make_local_update_timer
153     timer = Concurrent::TimerTask.new( run_now: true, &self.method(:update_local_info) )
154     timer.execution_interval = LOCAL_UPDATE_INTERVAL
155     timer.add_observer( Ravn::LoggingTaskObserver.new(:local_update_timer) )
156 
157     return timer
158 end
make_merged_node( discovery_data )

Make a Ravn::Tactical::Node out of the given discovery_data, merging it with the database version if it exists.

    # File lib/ravn/tactical/discovery.rb
353 def self::make_merged_node( discovery_data )
354     discovered_node = Ravn::Tactical::Node.new( discovery_data )
355 
356     if (node = Ravn::Tactical::Node[ device_id: discovered_node.device_id ])
357         self.log.debug "Merging discovery data with persisted data"
358         node.merge_discovery_data( discovered_node )
359     else
360         self.log.debug "Node %s is not registered" % [ discovered_node.device_id ]
361         node = discovered_node
362     end
363 
364     return node
365 end
node_data()

Return a Hash of this node’s discovery information.

    # File lib/ravn/tactical/discovery.rb
284 def self::node_data
285     @node_data ||= begin
286         mission = Ravn::Tactical::Mission.current
287         self.log.debug "Mission for discovery is: %p" % [ mission ]
288 
289         callsign = begin
290             Ravn::BDE.callsign
291         rescue Ravn::BDE::ValidationError => err
292             self.log.debug "Ignoring validation error while fetching current callsign."
293             Ravn::BDE.get_hostbased_callsign
294         end
295 
296         BEACON_KEYS.sort.zip([
297             callsign,
298             Ravn::Tactical.control_device_id,
299             Ravn.device_id,
300             mission&.chksum,
301             Zyre.z85_encode( Ravn::Crypto.public_key ),
302             Ravn.time.to_i,
303             Ravn::Tactical::VERSION
304         ]).to_h.freeze
305     end
306 end
read_node_broadcast()

Reads an incoming broadcast from a node and updates the node table with it.

    # File lib/ravn/tactical/discovery.rb
214 def self::read_node_broadcast
215     if self.beacon
216         msg = self.beacon.receive
217         self.update_node( msg )
218     end
219 rescue CZTop::Actor::DeadActorError => err
220     self.log.debug "Couldn't read broadcast: beacon actor died."
221 end
reset()

Reset discovery, clearing any discovered pairs.

    # File lib/ravn/tactical/discovery.rb
186 def self::reset
187     self.log.info "Resetting discovery info."
188     self.stop
189     self.nodes.clear
190 
191     @node_data = nil
192 end
start()

Start discovery.

    # File lib/ravn/tactical/discovery.rb
114 def self::start
115     raise "Already started?! (thread = %p, beacon = %p)" % [ self.thread, self.beacon ] if
116         self.started?
117 
118     if (iface_name = self.interface)
119         self.log.debug "Will use the %p interface." % [ iface_name ]
120         ENV['ZSYS_INTERFACE'] = iface_name
121     end
122 
123     self.log.info "Starting discovery beacon."
124     @beacon, @hostname = self.create_beacon
125     self.log.debug "Beacon is at: %s" % [ @hostname ]
126     @local_update_timer = self.make_local_update_timer
127     @expiration_timer = self.make_expiration_timer
128 
129     return @thread = Thread.new( &self.method(:handle_broadcasts) )
130 end
started?()

Return true if Discovery has been started.

    # File lib/ravn/tactical/discovery.rb
108 def self::started?
109     return (self.thread || self.beacon) ? true : false
110 end
stop()

Stop discovery.

    # File lib/ravn/tactical/discovery.rb
172 def self::stop
173     self.log.info "Stopping discovery beacon."
174 
175     @beacon&.terminate
176     @beacon = nil
177 
178     @thread.join( 2 ) if @thread&.alive?
179     @thread = nil
180 
181     @node_data = nil
182 end
update_local_info( * )

Update the information about the local node in the node table.

    # File lib/ravn/tactical/discovery.rb
225 def self::update_local_info( * )
226     self.log.debug "Updating local node discovery info"
227     @node_data = nil
228 
229     self.beacon&.publish( self.broadcast_data, BROADCAST_INTERVAL * 1000 )
230 
231     ip = IPAddr.new( self.hostname )
232     last_seen_at = Time.now
233 
234     discovery_data = self.node_data.merge( ip:, last_seen_at: )
235     node = self.make_merged_node( discovery_data )
236 
237     self.nodes[ Ravn.device_id ] = node
238 rescue CZTop::Actor::DeadActorError => err
239     self.log.error "Couldn't send update; beacon is shutting down."
240 end
update_node( message )

Update the node that sent the given zbeacon message.

    # File lib/ravn/tactical/discovery.rb
244 def self::update_node( message )
245     ip_frame = message[ 0 ] or raise "malformed zbeacon message: no frames!"
246     data_frame = message[ 1 ] or raise "malformed zbeacon message: no data frame!"
247 
248     ip = IPAddr.new( ip_frame )
249     header, encoded_data = data_frame.split( ' ', 2 )
250 
251     if header != BEACON_PREFIX
252         self.log.error "Ignoring beacon message from %s with header: %p" % [ ip, header ]
253         return
254     elsif !encoded_data
255         self.log.error "Ignoring beacon message from %s with no data" % [ ip ]
256         return
257     end
258 
259     node = self.decode_broadcast_data( ip, encoded_data ) or return
260 
261     node.last_seen_at = Time.now
262     node.ip = ip
263     node.freeze
264 
265     self.log.debug "Updating %s: from %d bytes of encoded data = %p" %
266         [ ip, encoded_data.bytesize, node ]
267     self.nodes[ node.device_id ] = node
268 end

Public Instance Methods

beacon()

The zbeacon that is used for discovery

   # File lib/ravn/tactical/discovery.rb
80 singleton_attr_reader :beacon
expiration_timer()

The Convurrent::TimerTask that controls expiration of discovery data

   # File lib/ravn/tactical/discovery.rb
94 singleton_attr_reader :expiration_timer
hostname()

The hostname that can be used to connect to this node

   # File lib/ravn/tactical/discovery.rb
85 singleton_attr_reader :hostname
local_update_timer()

The Convurrent::TimerTask that controls updating of local discovery data

   # File lib/ravn/tactical/discovery.rb
90 singleton_attr_reader :local_update_timer
nodes()

The table of node compute pack broadcasts keyed by ipaddr.

   # File lib/ravn/tactical/discovery.rb
98 singleton_attr_reader :nodes
thread()

The thread that listens for incoming broadcasts and populates the node list.

    # File lib/ravn/tactical/discovery.rb
103 singleton_attr_reader :thread