An broker for participating in a P2P event mash.
- DEFAULT_INITIAL_GROUPS
The default Zyre groups to join when starting up
- DEFAULT_OPTIONS
The default options used by the constructor.
- INTROSPECTION_EVENTS
Event types that ignore protocol matching because they’re about the local node.
- PROTECTED_GROUPS
Groups names which are invalid
- WILDCARD_EVENT_TYPE
Event type that will hook every event type. Should only be used for debugging/analysis tools.
- callbacks R
A Hash of callbacks registered for broker messages. Values are Sets of
call-able objects.
- interface_state R
The last known state of the broker’s network interface.
- interface_timer R
The Concurrent::TimerTask that checks the interface to be sure it’s still present
- node R
The Zyre node associated with the broker
- node_name R
The name of the broker’s node on the network
The cached headers of each peer, keyed by its UUID.
- peer_uuids R
A mapping of poer names to their UUIDs
- thread R
The Thread object running the broker if it’s been started
- triggers R
A Hash of callbacks registered for a single broker message. Values are Sets of call-able objects.
Load the extension with the given name
and add it to the Broker
class.
def self::load_extension( name )
filename = "ravn/net/broker/%s" % [ name ]
if require( filename )
modname = name.to_s.gsub( /(?:^|_)([a-z])/ ) { $1.upcase }
mod = const_get( modname )
prepend( mod )
end
rescue StandardError, LoadError => err
self.log.error "%p while loading the %s extension: %s" % [ err.class, name, err.message ]
raise err.exception( "while loading the %s extension: %s" % [name, err.message] )
end
load_extensions( *names )
Load extensions with the given names
.
def self::load_extensions( *names )
names.each do |name|
self.load_extension( name )
end
end
new( node_name=nil, options={} ) { |node| ... }
Create a new Broker
that will use a node with the given node_name
.
def initialize( node_name=nil, options={} )
options = DEFAULT_OPTIONS.merge( options )
@node_name = node_name || self.class.random_node_name
@node = Zyre::Node.new( @node_name )
@peer_headers = {}
@peer_uuids = {}
@thread = nil
@callbacks = {}
@triggers = {}
@running = false
@options = options
@interface_state = {}
@interface_timer = self.setup_interface_timer
self.set_options( **options )
yield( @node ) if block_given?
end
Return an unique random node name.
def self::random_node_name
return "ravn-net-broker-%s" % [ SecureRandom.hex(16) ]
end
start( node_name=nil, **options, &block )
Create a new Broker
(with the optional node_name
, start it, and return it. If a block is given, it’s called with the new broker immediately before it’s started.
def self::start( node_name=nil, **options, &block )
instance = new( node_name, **options )
block.call( instance ) if block
instance.start
return instance
end
all_callbacks_for( event_type )
Return a superset of the callbacks and triggers for the given event_type
.
def all_callbacks_for( event_type )
superset = Set.new
superset |= self.callbacks[ event_type ] if self.callbacks.key?( event_type )
superset |= self.triggers[ event_type ] if self.triggers.key?( event_type )
superset |= self.callbacks[ WILDCARD_EVENT_TYPE ] if self.callbacks.key?( WILDCARD_EVENT_TYPE )
superset |= self.triggers[ WILDCARD_EVENT_TYPE ] if self.triggers.key?( WILDCARD_EVENT_TYPE )
return superset
end
Check the current status of the Zyre interface. If the interface is no longer detected, stop the node.
def check_interface( * )
self.log.debug "Checking Zyre interface."
return if Ravn::Net.gossip
self.update_interface_state
unless self.interface_state[ :link ]
self.log.warn "Zyre interface is gone. Stopping."
self.stop if self.running?
end
end
dispatch_zre_event( zre_event )
Dispatch the given zre_event
according to its type and registered callbacks.
def dispatch_zre_event( zre_event )
return unless self.should_propagate?( zre_event )
event_type = Ravn::Net::Protocol.type_from_zre_event( zre_event ) or
return ignore_and_log_reason( "Invalid type." )
handlers = self.all_callbacks_for( event_type )
self.log.debug "%d handler/s for %p events" % [ handlers.size, event_type ]
if !handlers.empty?
message = self.wrap_zre_event( zre_event ) or return nil
handlers.each do |callback|
self.log.debug "calling %p with %p" % [ callback, message ]
callback.call( message )
end
self.triggers[ event_type ]&.subtract( handlers )
end
return handlers.size
end
endpoint_from_gossip( gossip_uri )
Generate an endpoint for peer communication, using the same network segment that is used to reach the gossip server. (Normally, this is automatic when attaching to an interface - this is explicit to support tunneling interfaces that Zyre normally won’t “see” as valid.)
def endpoint_from_gossip( gossip_uri )
address = UDPSocket.open {|s| s.connect( gossip_uri.host, 1 ); s.addr.last }
port = rand( 10000..65534 )
uri = "tcp://%s:%s" % [ address, port ]
self.log.info "Advertising %p as my peer endpoint." % [ uri ]
return uri
end
The Set of groups the broker will join, or is a member of if it’s already running.
def groups
return self.node.own_groups
end
handle_enter_zre_event( zre_event ) { || ... }
ZRE event handler – handle ENTER events.
def handle_enter_zre_event( zre_event )
self.log.info( zre_event )
headers = Ravn::Net::Protocol.normalize_headers( zre_event.headers )
self.log.info "Caching headers for node %s" % [ zre_event.peer_uuid ]
self.peer_headers[ zre_event.peer_uuid ] = headers.freeze
self.peer_uuids[ zre_event.peer_name ] = zre_event.peer_uuid
self.log.debug "Peer headers cached for: %p" % [ self.peer_headers.keys ]
self.log.info "Peers are now: %p" % [ self.peer_uuids ]
yield
end
handle_evasive_zre_event( zre_event )
handle_exit_zre_event( zre_event ) { || ... }
ZRE event handler – handle EXIT events.
def handle_exit_zre_event( zre_event )
self.log.info( zre_event )
yield
self.peer_headers.delete( zre_event.peer_uuid )
self.peer_uuids.delete_if {|name, uuid| zre_event.peer_uuid == uuid }
end
handle_join_zre_event( zre_event )
handle_leader_zre_event( zre_event )
handle_leave_zre_event( zre_event )
handle_shout_zre_event( zre_event )
handle_silent_zre_event( zre_event )
handle_stop_zre_event( zre_event ) { || ... }
ZRE event handler – handle STOP events.
def handle_stop_zre_event( zre_event )
self.log.info( zre_event )
yield
self.log.debug "Setting running to false."
self.stop
end
handle_whisper_zre_event( zre_event )
handle_zre_event( zre_event )
Do any local handling for the given zre_event
.
def handle_zre_event( zre_event )
method_name = "handle_%s_zre_event" % [ zre_event.type.downcase ]
self.public_send( method_name, zre_event ) do
self.dispatch_zre_event( zre_event )
end
end
ignore_and_log_reason( reason )
Log a reason for ignoring an event and return false.
def ignore_and_log_reason( reason )
self.log.debug "Ignoring event: %s" % [ reason ]
return false
end
Return the details of inspect output.
def inspect_details
return "%s {%s} [%s]" % [ self.node_name, self.node.uuid, self.groups.sort.join(', ') ]
end
is_introspection_event?( zre_event )
Returns true
for zre_events
that should propagate because they’re about the local node.
def is_introspection_event?( zre_event )
return INTROSPECTION_EVENTS.include?( zre_event.type )
end
Join the specified new_group
.
def join_group( new_group )
new_group = new_group.to_s
raise ArgumentError, "can't join protected group %p" % [ new_group ] if
PROTECTED_GROUPS.include?( new_group )
if self.groups.include?( new_group )
self.log.debug "Can't join `%s`: Already a member." % [ new_group ]
return false
else
self.log.info "Joining `%s` network group." % [ new_group ]
self.node.join( new_group )
return true
end
end
join_initial_groups( *names )
Set up the Broker’s node to immediately join the groups with the given names
on start.
def join_initial_groups( *names )
names.flatten.each do |group|
self.node.join( group )
end
end
leave_group( current_group )
Leave the current_group
if the broker is a member of it.
def leave_group( current_group )
if self.groups.include?( current_group )
self.log.info "Leaving `%s` network group." % [ current_group ]
self.node.leave( current_group )
return true
else
self.log.debug "Can't leave `%s`: Not a member." % [ current_group ]
return false
end
end
log_zre_event( zre_event ) { || ... }
Generic ZRE event handler – just log the event if debugging is enabled.
def log_zre_event( zre_event )
self.log.debug { zre_event }
yield
end
off( event_type, callback )
Unregister the given callback
for events of the given event_type
; this removes both handlers and triggers.
def off( event_type, callback )
if self.callbacks[ event_type ]&.delete( callback )
self.log.info "Will no longer call %p for %p events." % [ callback, event_type ]
end
if self.triggers[ event_type ]&.delete( callback )
self.log.info "Cancelled trigger %p for %p events." % [ callback, event_type ]
end
end
on( *event_types, &callback )
Register the given callback
for events of the given event_types
.
def on( *event_types, &callback )
event_types.each do |event_type|
self.callbacks[ event_type ] ||= Set.new
if self.callbacks[ event_type ].add( callback )
self.log.info "Will call %p for %p events." % [ callback, event_type ]
end
end
return callback
end
on_all_events( &callback )
on_any_event( &callback )
Register the given callback
for all events. NOTE: this should not be used for anything other than diagnostic tools.
def on_any_event( &callback )
self.callbacks[ WILDCARD_EVENT_TYPE ] ||= Set.new
if self.callbacks[ WILDCARD_EVENT_TYPE ].add( callback )
self.log.warn "Will call %p for ALL events." % [ callback ]
end
return callback
end
once( event_type, &callback )
Register the given callback
as a trigger that is called once for the first event of the given event_type
and then unregistered. Registering a callback as both a trigger and a handler will still only call it once per event of the given +event_type.
def once( event_type, &callback )
self.triggers[ event_type ] ||= Set.new
if self.triggers[ event_type ].add( callback )
self.log.info "Will call %p for the first %p event that occurs." % [ callback, event_type ]
end
return callback
end
peer_speaks_same_protocol?( peer_uuid )
Returns true
if the peer with the given peer_uuid
speaks the same protocol version we do.
def peer_speaks_same_protocol?( peer_uuid )
headers = self.peer_headers[ peer_uuid ] or
return ignore_and_log_reason( "No headers registered for peer (%p)." % [self.peer_headers.keys] )
protocol_version = headers[ :helios_protocol_version ] or
return ignore_and_log_reason( "Peer doesn't have protocol version header." )
self.log.debug "Event sent from peer speaking version %p of the protocol." %
[ protocol_version ]
if Ravn::Net::Protocol.supported_version?( protocol_version )
return true
else
return ignore_and_log_reason "Peer doesn't speak the same protocol version (%d)" %
[ Ravn::Net::Protocol.version ]
end
end
True if the broker is running, false if it hasn’t been started or is shutting down.
attr_predicate_accessor :running
send_message( *messages )
send_messages( *messages )
Send the specified messages
.
def send_messages( *messages )
messages.flatten.each do |message|
group, type, data = Ravn::Net::Protocol.encode( message )
if group == 'direct'
to = message.to or
raise Ravn::Net::MessageError, "can't send a direct message without a `to' field"
unless (peer_uuid = self.peer_uuids[ to ])
self.log.warn "Dropping message to %p: no such peer" % [ to ]
next
end
self.log.debug "Sending direct %s message to %s (%s)" % [ type, to, peer_uuid ]
self.node.whisper( peer_uuid, type, data )
else
self.log.debug "Broadcasting %s message to the %s group (%d bytes)." %
[ type, group, data.bytesize ]
self.node.shout( group, type, data )
end
end
end
Set up the broker according to the given options
Hash.
def set_options( **options )
self.log.info "Setting options: %p" % [ options ]
self.join_initial_groups( options[:groups] ) if options[:groups]
end
Create the timer for interface checks.
def setup_interface_timer
self.log.info "Starting zyre interface checks"
timer = Concurrent::TimerTask.new( &self.method(:check_interface) )
timer.execution_interval = 5
timer.add_observer( Ravn::LoggingTaskObserver.new(:interface_timer) )
return timer
end
should_propagate?( zre_event )
Examine the given zre_event
and return true
if it looks like it should be propagated to message handlers.
def should_propagate?( zre_event )
return true if zre_event.type == :STOP
return false if self.is_introspection_event?( zre_event )
return self.peer_speaks_same_protocol?( zre_event.peer_uuid )
end
Join the network and start consuming ZRE events.
def start
iface = Ravn::Net.interface
self.log.info "Starting node on %s." % [ iface ]
Zyre.disable_zsys_handler
self.node.headers = Ravn::Net::Protocol.node_headers
self.node.interface = iface
if ( gossip = Ravn::Net.gossip )
self.log.warn "Discovering peers via gossip: %p" % [ gossip.to_s ]
node.endpoint = self.endpoint_from_gossip( gossip )
node.gossip_connect( gossip.to_s )
end
self.interface_timer.execute
@thread = Thread.new( &self.method(:start_handling_events) )
return @thread
end
Thread work method – start reading events as they come in and dispatching them appropriately as Messages.
def start_handling_events
Thread.current.abort_on_exception = true
Thread.current.name = "Net::Broker event handler"
start_time = Ravn.monotonic_time
self.log.info "Starting Zyre %s node %s(%s)." %
[ Zyre.zyre_version, self.node.name, self.node.uuid ]
self.running = true
self.node.start
self.log.info "Started handling events."
while self.running?
if (zre_event = self.node.recv)
self.log.debug "Handling a %p ZRE event." % [ zre_event.type ]
self.handle_zre_event( zre_event )
else
self.log.warn "Read was interrupted: trying again."
end
end
self.log.info "Done handling events."
return Ravn.monotonic_time - start_time
end
Returns true
if the Broker’s been started.
def started?
return self.thread&.alive?
end
Stop listening for events and leave the network.
def stop
self.log.info "Stopping node."
self.interface_timer.shutdown
self.running = false
self.node&.stop
end
Build a status hash for external interrogation.
def update_interface_state
@interface_state = {
interface: Ravn::Net.interface,
link: Ravn::Net.gossip ?
true :
Zyre.interfaces.include?( Ravn::Net.interface )
}
end
wrap_zre_event( zre_event )
Do any internal handling appropriate for the given zre_event
.
def wrap_zre_event( zre_event )
headers = self.peer_headers[ zre_event.peer_uuid ]
message = Ravn::Net::Protocol.decode( headers, zre_event )
self.log.debug { message }
return message
rescue Ravn::Net::MessageError => err
self.log.warn "%p while wrapping a ZRE event: %s" % [ err.class, err.message ]
return nil
end