Ravn::Tactical::
Discovery
module
Functions for discovery of other compute packs on the LAN.
Refs: - api.zeromq.org/czmq3-0:zbeacon
- BEACON_KEYS
The sorted list of discovery hash keys.
- BEACON_PREFIX
The beacon “type”, for filtering
- BROADCAST_INTERVAL
How often to broadcast, in seconds
- DISCOVERY_DATA_TIMEOUT
How long since the last beacon was received from a node before its discovery data is expired.
- EXPIRATION_INTERVAL
How often to check for expired discovery data
- LOCAL_UPDATE_INTERVAL
How often to refresh local node data in the discovery table
- POLL_TIMEOUT
How many seconds to wait for an incoming broadcast in the poll loop
- PORT
Port to use for discovery
broadcast_data( overrides={} )
Return the String of data that is used for the broadcast.
310 def self::broadcast_data( overrides={} )
311 data = self.node_data.merge( overrides )
312
313 packed_data = MessagePack.pack( data.values )
314 encoded = BEACON_PREFIX + ' ' + [ packed_data ].pack( 'm0' )
315 self.log.debug "Encoded broadcast is %d bytes" % [ encoded.bytesize ]
316
317 if encoded.bytesize > CZTop::Beacon::MAX_BEACON_DATA
318 raise "discovery broadcast is too large (%d bytes > %d)" % [
319 encoded.bytesize,
320 CZTop::Beacon::MAX_BEACON_DATA
321 ]
322 end
323
324 return encoded
325 end
Create and return a CZTop::Beacon that will be used to discover other nodes.
134 def self::create_beacon
135 beacon = CZTop::Beacon.new
136 hostname = beacon.configure( PORT )
137
138 beacon.verbose! if $DEBUG || $VERBOSE
139 beacon.subscribe( BEACON_PREFIX )
140
141 self.log.debug "Publishing: %p" % [ self.broadcast_data ]
142 beacon.publish( self.broadcast_data, BROADCAST_INTERVAL * 1000 )
143
144 self.log.info " beacon started on %s:%d" % [ hostname, PORT ]
145
146 return beacon, hostname
147 end
decode_broadcast_data( ip, encoded_data )
Decode the specified encoded_data
, sanity-check it, and return it if it appears valid.
330 def self::decode_broadcast_data( ip, encoded_data )
331 packed_data = encoded_data.unpack1( 'm0' ) or
332 raise ArgumentError, "invalid encoded data"
333 unpacked = MessagePack.unpack( packed_data )
334
335 node_data = BEACON_KEYS.zip( unpacked ).to_h
336 node = self.make_merged_node( node_data )
337
338 unless node.valid_for_discovery?
339 self.log.error "Ignoring broadcast from %s: %s" %
340 [ ip, node.errors.full_messages.join('; ') ]
341 return nil
342 end
343
344 return node
345 rescue ArgumentError, MessagePack::MalformedFormatError => err
346 self.log.error "Malformed broadcast: %p: %s" % [ packed_data, err.message ]
347 return nil
348 end
expire_discovery_data( * )
Expire discovery data of nodes that haven’t beaconed in a while.
272 def self::expire_discovery_data( * )
273 stale_time = Time.now - DISCOVERY_DATA_TIMEOUT
274 self.nodes.delete_if do |device_id, node|
275 if node.last_seen_at <= stale_time
276 self.log.debug "Expiring discovery data for %s" % [ device_id ]
277 true
278 end
279 end
280 end
Loop over the beacon, adding nodes for incoming broadcasts.
196 def self::handle_broadcasts
197 Thread.current.name = "Discovery Broadcaster" unless Thread.current == Thread.main
198 self.log.info "Waiting for broadcasts."
199 poller = CZTop::Poller::ZPoller.new( self.beacon.actor )
200
201 self.local_update_timer.execute
202 self.expiration_timer.execute
203 while self.beacon && !self.beacon.actor.dead?
204 self.read_node_broadcast if poller.wait( POLL_TIMEOUT * 1000 )
205 end
206 self.expiration_timer.shutdown
207 self.local_update_timer.shutdown
208
209 self.log.info "Stopped waiting for broadcasts."
210 end
Return a Concurrent::TimerTask that will expire old discovery data.
162 def self::make_expiration_timer
163 timer = Concurrent::TimerTask.new( &self.method(:expire_discovery_data) )
164 timer.execution_interval = EXPIRATION_INTERVAL
165 timer.add_observer( Ravn::LoggingTaskObserver.new(:discovery_expiration_timer) )
166
167 return timer
168 end
make_local_update_timer()
Return a Concurrent::TimerTask that will update node discovery data for the local node.
152 def self::make_local_update_timer
153 timer = Concurrent::TimerTask.new( run_now: true, &self.method(:update_local_info) )
154 timer.execution_interval = LOCAL_UPDATE_INTERVAL
155 timer.add_observer( Ravn::LoggingTaskObserver.new(:local_update_timer) )
156
157 return timer
158 end
make_merged_node( discovery_data )
Make a Ravn::Tactical::Node
out of the given discovery_data
, merging it with the database version if it exists.
353 def self::make_merged_node( discovery_data )
354 discovered_node = Ravn::Tactical::Node.new( discovery_data )
355
356 if (node = Ravn::Tactical::Node[ device_id: discovered_node.device_id ])
357 self.log.debug "Merging discovery data with persisted data"
358 node.merge_discovery_data( discovered_node )
359 else
360 self.log.debug "Node %s is not registered" % [ discovered_node.device_id ]
361 node = discovered_node
362 end
363
364 return node
365 end
Return a Hash of this node’s discovery information.
284 def self::node_data
285 @node_data ||= begin
286 mission = Ravn::Tactical::Mission.current
287 self.log.debug "Mission for discovery is: %p" % [ mission ]
288
289 callsign = begin
290 Ravn::BDE.callsign
291 rescue Ravn::BDE::ValidationError => err
292 self.log.debug "Ignoring validation error while fetching current callsign."
293 Ravn::BDE.get_hostbased_callsign
294 end
295
296 BEACON_KEYS.sort.zip([
297 callsign,
298 Ravn::Tactical.control_device_id,
299 Ravn.device_id,
300 mission&.chksum,
301 Zyre.z85_encode( Ravn::Crypto.public_key ),
302 Ravn.time.to_i,
303 Ravn::Tactical::VERSION
304 ]).to_h.freeze
305 end
306 end
Reads an incoming broadcast from a node and updates the node table with it.
214 def self::read_node_broadcast
215 if self.beacon
216 msg = self.beacon.receive
217 self.update_node( msg )
218 end
219 rescue CZTop::Actor::DeadActorError => err
220 self.log.debug "Couldn't read broadcast: beacon actor died."
221 end
Reset discovery, clearing any discovered pairs.
186 def self::reset
187 self.log.info "Resetting discovery info."
188 self.stop
189 self.nodes.clear
190
191 @node_data = nil
192 end
Start discovery.
114 def self::start
115 raise "Already started?! (thread = %p, beacon = %p)" % [ self.thread, self.beacon ] if
116 self.started?
117
118 if (iface_name = self.interface)
119 self.log.debug "Will use the %p interface." % [ iface_name ]
120 ENV['ZSYS_INTERFACE'] = iface_name
121 end
122
123 self.log.info "Starting discovery beacon."
124 @beacon, @hostname = self.create_beacon
125 self.log.debug "Beacon is at: %s" % [ @hostname ]
126 @local_update_timer = self.make_local_update_timer
127 @expiration_timer = self.make_expiration_timer
128
129 return @thread = Thread.new( &self.method(:handle_broadcasts) )
130 end
Return true
if Discovery
has been started.
108 def self::started?
109 return (self.thread || self.beacon) ? true : false
110 end
Stop discovery.
172 def self::stop
173 self.log.info "Stopping discovery beacon."
174
175 @beacon&.terminate
176 @beacon = nil
177
178 @thread.join( 2 ) if @thread&.alive?
179 @thread = nil
180
181 @node_data = nil
182 end
Update the information about the local node in the node table.
225 def self::update_local_info( * )
226 self.log.debug "Updating local node discovery info"
227 @node_data = nil
228
229 self.beacon&.publish( self.broadcast_data, BROADCAST_INTERVAL * 1000 )
230
231 ip = IPAddr.new( self.hostname )
232 last_seen_at = Time.now
233
234 discovery_data = self.node_data.merge( ip:, last_seen_at: )
235 node = self.make_merged_node( discovery_data )
236
237 self.nodes[ Ravn.device_id ] = node
238 rescue CZTop::Actor::DeadActorError => err
239 self.log.error "Couldn't send update; beacon is shutting down."
240 end
Update the node that sent the given zbeacon message
.
244 def self::update_node( message )
245 ip_frame = message[ 0 ] or raise "malformed zbeacon message: no frames!"
246 data_frame = message[ 1 ] or raise "malformed zbeacon message: no data frame!"
247
248 ip = IPAddr.new( ip_frame )
249 header, encoded_data = data_frame.split( ' ', 2 )
250
251 if header != BEACON_PREFIX
252 self.log.error "Ignoring beacon message from %s with header: %p" % [ ip, header ]
253 return
254 elsif !encoded_data
255 self.log.error "Ignoring beacon message from %s with no data" % [ ip ]
256 return
257 end
258
259 node = self.decode_broadcast_data( ip, encoded_data ) or return
260
261 node.last_seen_at = Time.now
262 node.ip = ip
263 node.freeze
264
265 self.log.debug "Updating %s: from %d bytes of encoded data = %p" %
266 [ ip, encoded_data.bytesize, node ]
267 self.nodes[ node.device_id ] = node
268 end
The zbeacon that is used for discovery
80 singleton_attr_reader :beacon
The Convurrent::TimerTask that controls expiration of discovery data
94 singleton_attr_reader :expiration_timer
The hostname that can be used to connect to this node
85 singleton_attr_reader :hostname
The Convurrent::TimerTask that controls updating of local discovery data
90 singleton_attr_reader :local_update_timer
The table of node compute pack broadcasts keyed by ipaddr.
98 singleton_attr_reader :nodes
The thread that listens for incoming broadcasts and populates the node list.
103 singleton_attr_reader :thread