Ravn::Net::

Broker class

An broker for participating in a P2P event mash.

Constants

DEFAULT_INITIAL_GROUPS

The default Zyre groups to join when starting up

DEFAULT_OPTIONS

The default options used by the constructor.

INTROSPECTION_EVENTS

Event types that ignore protocol matching because they’re about the local node.

PROTECTED_GROUPS

Groups names which are invalid

WILDCARD_EVENT_TYPE

Event type that will hook every event type. Should only be used for debugging/analysis tools.

Attributes

callbacks R

A Hash of callbacks registered for broker messages. Values are Sets of

call-able objects.

interface_state R

The last known state of the broker’s network interface.

interface_timer R

The Concurrent::TimerTask that checks the interface to be sure it’s still present

node R

The Zyre node associated with the broker

node_name R

The name of the broker’s node on the network

peer_headers R

The cached headers of each peer, keyed by its UUID.

peer_uuids R

A mapping of poer names to their UUIDs

thread R

The Thread object running the broker if it’s been started

triggers R

A Hash of callbacks registered for a single broker message. Values are Sets of call-able objects.

Public Class Methods

load_extension( name )

Load the extension with the given name and add it to the Broker class.

# File lib/ravn/net/broker.rb, line 76
def self::load_extension( name )
        filename = "ravn/net/broker/%s" % [ name ]
        if require( filename )
                modname = name.to_s.gsub( /(?:^|_)([a-z])/ ) { $1.upcase }
                mod = const_get( modname )

                prepend( mod )
        end
rescue StandardError, LoadError => err
        self.log.error "%p while loading the %s extension: %s" % [ err.class, name, err.message ]
        raise err.exception( "while loading the %s extension: %s" % [name, err.message] )
end
load_extensions( *names )

Load extensions with the given names.

# File lib/ravn/net/broker.rb, line 68
def self::load_extensions( *names )
        names.each do |name|
                self.load_extension( name )
        end
end
new( node_name=nil, options={} ) { |node| ... }

Create a new Broker that will use a node with the given node_name.

# File lib/ravn/net/broker.rb, line 91
def initialize( node_name=nil, options={} )
        options = DEFAULT_OPTIONS.merge( options )

        @node_name       = node_name || self.class.random_node_name
        @node            = Zyre::Node.new( @node_name )
        @peer_headers    = {}
        @peer_uuids      = {}
        @thread          = nil
        @callbacks       = {}
        @triggers        = {}
        @running         = false
        @options         = options
        @interface_state = {}

        @interface_timer = self.setup_interface_timer

        self.set_options( **options )

        yield( @node ) if block_given?
end
random_node_name()

Return an unique random node name.

# File lib/ravn/net/broker.rb, line 51
def self::random_node_name
        return "ravn-net-broker-%s" % [ SecureRandom.hex(16) ]
end
start( node_name=nil, **options, &block )

Create a new Broker (with the optional node_name, start it, and return it. If a block is given, it’s called with the new broker immediately before it’s started.

# File lib/ravn/net/broker.rb, line 59
def self::start( node_name=nil, **options, &block )
        instance = new( node_name, **options )
        block.call( instance ) if block
        instance.start
        return instance
end

Public Instance Methods

all_callbacks_for( event_type )

Return a superset of the callbacks and triggers for the given event_type.

# File lib/ravn/net/broker.rb, line 445
def all_callbacks_for( event_type )
        superset = Set.new

        superset |= self.callbacks[ event_type ] if self.callbacks.key?( event_type )
        superset |= self.triggers[ event_type ] if self.triggers.key?( event_type )
        superset |= self.callbacks[ WILDCARD_EVENT_TYPE ] if self.callbacks.key?( WILDCARD_EVENT_TYPE )
        superset |= self.triggers[ WILDCARD_EVENT_TYPE ] if self.triggers.key?( WILDCARD_EVENT_TYPE )

        return superset
end
check_interface( * )

Check the current status of the Zyre interface. If the interface is no longer detected, stop the node.

# File lib/ravn/net/broker.rb, line 163
def check_interface( * )
        self.log.debug "Checking Zyre interface."
        return if Ravn::Net.gossip

        self.update_interface_state

        unless self.interface_state[ :link ]
                self.log.warn "Zyre interface is gone. Stopping."
                self.stop if self.running?
        end
end
dispatch_zre_event( zre_event )

Dispatch the given zre_event according to its type and registered callbacks.

# File lib/ravn/net/broker.rb, line 458
def dispatch_zre_event( zre_event )
        return unless self.should_propagate?( zre_event )

        event_type = Ravn::Net::Protocol.type_from_zre_event( zre_event ) or
                return ignore_and_log_reason( "Invalid type." )
        handlers = self.all_callbacks_for( event_type )
        self.log.debug "%d handler/s for %p events" % [ handlers.size, event_type ]

        if !handlers.empty?
                message = self.wrap_zre_event( zre_event ) or return nil
                handlers.each do |callback|
                        self.log.debug "calling %p with %p" % [ callback, message ]
                        callback.call( message )
                end

                self.triggers[ event_type ]&.subtract( handlers )
        end

        return handlers.size
end
endpoint_from_gossip( gossip_uri )

Generate an endpoint for peer communication, using the same network segment that is used to reach the gossip server. (Normally, this is automatic when attaching to an interface - this is explicit to support tunneling interfaces that Zyre normally won’t “see” as valid.)

# File lib/ravn/net/broker.rb, line 547
def endpoint_from_gossip( gossip_uri )
        address = UDPSocket.open {|s| s.connect( gossip_uri.host, 1 ); s.addr.last }
        port    = rand( 10000..65534 )
        uri     = "tcp://%s:%s" % [ address, port ]

        self.log.info "Advertising %p as my peer endpoint." % [ uri ]
        return uri
end
groups()

The Set of groups the broker will join, or is a member of if it’s already running.

# File lib/ravn/net/broker.rb, line 242
def groups
        return self.node.own_groups
end
handle_enter_zre_event( zre_event ) { || ... }

ZRE event handler – handle ENTER events.

# File lib/ravn/net/broker.rb, line 398
def handle_enter_zre_event( zre_event )
        self.log.info( zre_event )
        headers = Ravn::Net::Protocol.normalize_headers( zre_event.headers )
        self.log.info "Caching headers for node %s" % [ zre_event.peer_uuid ]
        self.peer_headers[ zre_event.peer_uuid ] = headers.freeze
        self.peer_uuids[ zre_event.peer_name ] = zre_event.peer_uuid
        self.log.debug "Peer headers cached for: %p" % [ self.peer_headers.keys ]

        self.log.info "Peers are now: %p" % [ self.peer_uuids ]

        yield
end
handle_evasive_zre_event( zre_event )
Alias for: log_zre_event
handle_exit_zre_event( zre_event ) { || ... }

ZRE event handler – handle EXIT events.

# File lib/ravn/net/broker.rb, line 413
def handle_exit_zre_event( zre_event )
        self.log.info( zre_event )
        yield
        self.peer_headers.delete( zre_event.peer_uuid )
        self.peer_uuids.delete_if {|name, uuid| zre_event.peer_uuid == uuid }
end
handle_join_zre_event( zre_event )
Alias for: log_zre_event
handle_leader_zre_event( zre_event )
Alias for: log_zre_event
handle_leave_zre_event( zre_event )
Alias for: log_zre_event
handle_shout_zre_event( zre_event )
Alias for: log_zre_event
handle_silent_zre_event( zre_event )
Alias for: log_zre_event
handle_stop_zre_event( zre_event ) { || ... }

ZRE event handler – handle STOP events.

# File lib/ravn/net/broker.rb, line 422
def handle_stop_zre_event( zre_event )
        self.log.info( zre_event )
        yield
        self.log.debug "Setting running to false."
        self.stop
end
handle_whisper_zre_event( zre_event )
Alias for: log_zre_event
handle_zre_event( zre_event )

Do any local handling for the given zre_event.

# File lib/ravn/net/broker.rb, line 388
def handle_zre_event( zre_event )
        # :TODO: Profile this to see if an explicit dispatch table or case/when is faster
        method_name = "handle_%s_zre_event" % [ zre_event.type.downcase ]
        self.public_send( method_name, zre_event ) do
                self.dispatch_zre_event( zre_event )
        end
end
ignore_and_log_reason( reason )

Log a reason for ignoring an event and return false.

# File lib/ravn/net/broker.rb, line 531
def ignore_and_log_reason( reason )
        self.log.debug "Ignoring event: %s" % [ reason ]
        return false
end
inspect_details()

Return the details of inspect output.

# File lib/ravn/net/broker.rb, line 538
def inspect_details
        return "%s {%s} [%s]" % [ self.node_name, self.node.uuid, self.groups.sort.join(', ') ]
end
is_introspection_event?( zre_event )

Returns true for zre_events that should propagate because they’re about the local node.

# File lib/ravn/net/broker.rb, line 491
def is_introspection_event?( zre_event )
        return INTROSPECTION_EVENTS.include?( zre_event.type )
end
join_group( new_group )

Join the specified new_group.

# File lib/ravn/net/broker.rb, line 248
def join_group( new_group )
        new_group = new_group.to_s

        raise ArgumentError, "can't join protected group %p" % [ new_group ] if
                PROTECTED_GROUPS.include?( new_group )

        if self.groups.include?( new_group )
                self.log.debug "Can't join `%s`: Already a member." % [ new_group ]
                return false
        else
                self.log.info "Joining `%s` network group." % [ new_group ]
                self.node.join( new_group )

                return true
        end
end
join_initial_groups( *names )

Set up the Broker’s node to immediately join the groups with the given names on start.

# File lib/ravn/net/broker.rb, line 185
def join_initial_groups( *names )
        names.flatten.each do |group|
                self.node.join( group )
        end
end
leave_group( current_group )

Leave the current_group if the broker is a member of it.

# File lib/ravn/net/broker.rb, line 267
def leave_group( current_group )
        if self.groups.include?( current_group )
                self.log.info "Leaving `%s` network group." % [ current_group ]
                self.node.leave( current_group )

                return true
        else
                self.log.debug "Can't leave `%s`: Not a member." % [ current_group ]
                return false
        end
end
log_zre_event( zre_event ) { || ... }

Generic ZRE event handler – just log the event if debugging is enabled.

# File lib/ravn/net/broker.rb, line 431
def log_zre_event( zre_event )
        self.log.debug { zre_event }
        yield
end
off( event_type, callback )

Unregister the given callback for events of the given event_type; this removes both handlers and triggers.

# File lib/ravn/net/broker.rb, line 335
def off( event_type, callback )
        if self.callbacks[ event_type ]&.delete( callback )
                self.log.info "Will no longer call %p for %p events." % [ callback, event_type ]
        end

        if self.triggers[ event_type ]&.delete( callback )
                self.log.info "Cancelled trigger %p for %p events." % [ callback, event_type ]
        end
end
on( *event_types, &callback )

Register the given callback for events of the given event_types.

# File lib/ravn/net/broker.rb, line 306
def on( *event_types, &callback )
        event_types.each do |event_type|
                self.callbacks[ event_type ] ||= Set.new

                if self.callbacks[ event_type ].add( callback )
                        self.log.info "Will call %p for %p events." % [ callback, event_type ]
                end
        end

        return callback
end
on_all_events( &callback )
Alias for: on_any_event
on_any_event( &callback )

Register the given callback for all events. NOTE: this should not be used for anything other than diagnostic tools.

# File lib/ravn/net/broker.rb, line 321
def on_any_event( &callback )
        self.callbacks[ WILDCARD_EVENT_TYPE ] ||= Set.new

        if self.callbacks[ WILDCARD_EVENT_TYPE ].add( callback )
                self.log.warn "Will call %p for ALL events." % [ callback ]
        end

        return callback
end
Also aliased as: on_all_events
once( event_type, &callback )

Register the given callback as a trigger that is called once for the first event of the given event_type and then unregistered. Registering a callback as both a trigger and a handler will still only call it once per event of the given +event_type.

# File lib/ravn/net/broker.rb, line 349
def once( event_type, &callback )
        self.triggers[ event_type ] ||= Set.new

        if self.triggers[ event_type ].add( callback )
                self.log.info "Will call %p for the first %p event that occurs." % [ callback, event_type ]
        end

        return callback
end
peer_speaks_same_protocol?( peer_uuid )

Returns true if the peer with the given peer_uuid speaks the same protocol version we do.

# File lib/ravn/net/broker.rb, line 498
def peer_speaks_same_protocol?( peer_uuid )
        headers = self.peer_headers[ peer_uuid ] or
                return ignore_and_log_reason( "No headers registered for peer (%p)." % [self.peer_headers.keys] )
        protocol_version = headers[ :helios_protocol_version ] or
                return ignore_and_log_reason( "Peer doesn't have protocol version header." )

        # :TODO: Support older protocol versions when there are some
        self.log.debug "Event sent from peer speaking version %p of the protocol." %
                [ protocol_version ]

        if Ravn::Net::Protocol.supported_version?( protocol_version )
                return true
        else
                return ignore_and_log_reason "Peer doesn't speak the same protocol version (%d)" %
                        [ Ravn::Net::Protocol.version ]
        end
end
running()

True if the broker is running, false if it hasn’t been started or is shutting down.

# File lib/ravn/net/broker.rb, line 150
attr_predicate_accessor :running
send_message( *messages )
Alias for: send_messages
send_messages( *messages )

Send the specified messages.

# File lib/ravn/net/broker.rb, line 281
def send_messages( *messages )
        messages.flatten.each do |message|
                group, type, data = Ravn::Net::Protocol.encode( message )

                if group == 'direct'
                        to = message.to or
                                raise Ravn::Net::MessageError, "can't send a direct message without a `to' field"
                        unless (peer_uuid = self.peer_uuids[ to ])
                                self.log.warn "Dropping message to %p: no such peer" % [ to ]
                                next
                        end

                        self.log.debug "Sending direct %s message to %s (%s)" % [ type, to, peer_uuid ]
                        self.node.whisper( peer_uuid, type, data )
                else
                        self.log.debug "Broadcasting %s message to the %s group (%d bytes)." %
                                [ type, group, data.bytesize ]
                        self.node.shout( group, type, data )
                end
        end
end
Also aliased as: send_message
set_options( **options )

Set up the broker according to the given options Hash.

# File lib/ravn/net/broker.rb, line 177
def set_options( **options )
        self.log.info "Setting options: %p" % [ options ]
        self.join_initial_groups( options[:groups] ) if options[:groups]
end
setup_interface_timer()

Create the timer for interface checks.

# File lib/ravn/net/broker.rb, line 558
def setup_interface_timer
        self.log.info "Starting zyre interface checks"

        timer = Concurrent::TimerTask.new( &self.method(:check_interface) )
        timer.execution_interval = 5
        timer.add_observer( Ravn::LoggingTaskObserver.new(:interface_timer) )

        return timer
end
should_propagate?( zre_event )

Examine the given zre_event and return true if it looks like it should be propagated to message handlers.

# File lib/ravn/net/broker.rb, line 482
def should_propagate?( zre_event )
        return true if zre_event.type == :STOP
        return false if self.is_introspection_event?( zre_event )
        return self.peer_speaks_same_protocol?( zre_event.peer_uuid )
end
start()

Join the network and start consuming ZRE events.

# File lib/ravn/net/broker.rb, line 199
def start
        iface = Ravn::Net.interface
        self.log.info "Starting node on %s." % [ iface ]

        Zyre.disable_zsys_handler
        self.node.headers = Ravn::Net::Protocol.node_headers
        self.node.interface = iface

        if ( gossip = Ravn::Net.gossip )
                self.log.warn "Discovering peers via gossip: %p" % [ gossip.to_s ]
                node.endpoint = self.endpoint_from_gossip( gossip )
                node.gossip_connect( gossip.to_s )
        end

        self.interface_timer.execute

        @thread = Thread.new( &self.method(:start_handling_events) )
        return @thread
end
start_handling_events()

Thread work method – start reading events as they come in and dispatching them appropriately as Messages.

# File lib/ravn/net/broker.rb, line 362
def start_handling_events
        Thread.current.abort_on_exception = true
        Thread.current.name = "Net::Broker event handler"
        start_time = Ravn.monotonic_time

        self.log.info "Starting Zyre %s node %s(%s)." %
                [ Zyre.zyre_version, self.node.name, self.node.uuid ]
        self.running = true
        self.node.start

        self.log.info "Started handling events."
        while self.running?
                if (zre_event = self.node.recv)
                        self.log.debug "Handling a %p ZRE event." % [ zre_event.type ]
                        self.handle_zre_event( zre_event )
                else
                        self.log.warn "Read was interrupted: trying again."
                end
        end
        self.log.info "Done handling events."

        return Ravn.monotonic_time - start_time
end
started?()

Returns true if the Broker’s been started.

# File lib/ravn/net/broker.rb, line 193
def started?
        return self.thread&.alive?
end
stop()

Stop listening for events and leave the network.

# File lib/ravn/net/broker.rb, line 232
def stop
        self.log.info "Stopping node."
        self.interface_timer.shutdown
        self.running = false
        self.node&.stop
end
update_interface_state()

Build a status hash for external interrogation.

# File lib/ravn/net/broker.rb, line 221
def update_interface_state
        @interface_state = {
                interface: Ravn::Net.interface,
                link: Ravn::Net.gossip ?
                        true :
                        Zyre.interfaces.include?( Ravn::Net.interface )
        }
end
wrap_zre_event( zre_event )

Do any internal handling appropriate for the given zre_event.

# File lib/ravn/net/broker.rb, line 518
def wrap_zre_event( zre_event )
        headers = self.peer_headers[ zre_event.peer_uuid ]
        message = Ravn::Net::Protocol.decode( headers, zre_event )

        self.log.debug { message }
        return message
rescue Ravn::Net::MessageError => err
        self.log.warn "%p while wrapping a ZRE event: %s" % [ err.class, err.message ]
        return nil
end