Ravn::Net::Broker::
Election
module
Adds election functionality.
- leadership R
The leadership Hash that tracks which node is the leader, keyed by group.
- leadership_resolutions R
A Hash of Futures keyed by group name that resolve to true or false to indicate whether this Broker
is the leader of the group.
Override the initializer to add instance data for elections.
def initialize( * )
@leadership = {}
@leadership_resolutions = {}
super
end
Inclusion hook – check to be sure election is actually supported.
def self::prepended( mod )
raise NotImplementedError,
"this system's libzyre is too old for election support" unless
Zyre.has_draft_apis?
Ravn::Net::Broker.log.debug "adding election functionality to brokers"
super
end
election_finished?( group_name )
Returns true
if an election has happened in the group with the given group_name
.
def election_finished?( group_name )
return self.leadership.key?( group_name )
end
handle_leader_zre_event( zre_event )
Override the leader event handler to register the leader of groups.
def handle_leader_zre_event( zre_event )
leader = zre_event.peer_uuid
groupname = zre_event.group
self.log.info "leader of '%s' has been elected: %s" % [ groupname, leader ]
self.leadership[ groupname ] = leader
unless self.leadership_resolutions[ groupname ].resolved?
self.leadership_resolutions[ groupname ].fulfill( leader == self.node.uuid )
end
super
end
Returns true
if the receiving node is the elected leader in the group with the given group_name
.
def leader_of?( group_name )
return false unless self.election_finished?( group_name )
return self.leadership[ group_name ] == self.node.uuid
end
Set up leadership in groups which are designated in an elect_leader
option.
def set_options( **options )
leader_groups = Array( options[:elect_leader] )
options[:groups] |= leader_groups
super
leader_groups.each do |group|
self.log.info "setting up %s group to elect a leader" % [ group ]
self.leadership_resolutions[ group ] = Concurrent::Promises.resolvable_future
self.node.set_contest_in_group( group )
end
end
Override to reject any pending leadership resolution futures.
def stop
self.log.debug "Checking for unresolved leadership futures..."
self.leadership_resolutions.each do |group, promise|
next if promise.resolved?
self.log.warn "Rejecting leadership resolution for %s" % [ group ]
promise.reject( "Broker is shutting down." )
end
super
end