Ravn::BDE::

BoltRunner class

Constants

HEARTBEAT_INTERVAL

How often to run the heartbeat callback in running bolts

Attributes

heartbeat_timer R

The Concurrent::TimerTask that emits the heartbeat event

mission_config R

The current mission configuration

state_manager R

The state manager (Actor)

Public Class Methods

new( mission_config )

Create a new BoltRunner for the specified element and mission.

# File lib/ravn/bde/bolt_runner.rb, line 23
def initialize( mission_config )
        @mission_config  = mission_config

        @state_manager   = nil
        @heartbeat_timer = self.make_heartbeat_timer

        super
end

Public Instance Methods

handle_finish_bolt( *args )

Handler for finishing bolts.

# File lib/ravn/bde/bolt_runner.rb, line 107
def handle_finish_bolt( *args )
        bolt, forwarded_event, * = *args.flatten

        bolt_ref = self.envelope.sender
        self.log.debug "Pausing bolt: %s" % [ bolt.id ]
        bolt_ref.ask!( :pause! )

        self.state_manager.ask!( finish_bolt: bolt )

        self.log.debug "Restarting bolt: %s" % [ bolt.id ]
        bolt_ref.tell( :restart! )

        bolt_ref.tell( forwarded_event ) if forwarded_event
end
handle_heartbeat( *args )

Relay the heartbeat event to all running bolts.

# File lib/ravn/bde/bolt_runner.rb, line 124
def handle_heartbeat( *args )
        self.children.each do |bolt|
                bolt.tell( heartbeat: true )
        end
end
make_heartbeat_timer()

Create a TimerTask that will emit heartbeat events to all running bolts.

# File lib/ravn/bde/bolt_runner.rb, line 138
def make_heartbeat_timer
        timer = Concurrent::TimerTask.new( &self.method(:send_heartbeat) )
        timer.execution_interval = HEARTBEAT_INTERVAL
        timer.add_observer( Ravn::LoggingTaskObserver.new(:heartbeat_timer) )

        return timer
end
send_heartbeat( * )

Send a heartbeat event.

# File lib/ravn/bde/bolt_runner.rb, line 132
def send_heartbeat( * )
        self.tell( heartbeat: true )
end
start()

Overridden — start the state manager and the configured bolts

# File lib/ravn/bde/bolt_runner.rb, line 51
def start
        @state_manager = Ravn::Actor[ :state_manager ] or
                raise "Unable to get a state manager reference."
        self.start_bolts
        self.heartbeat_timer.execute
end
start_bolts()

Load the configured bolts and return their Actor references.

# File lib/ravn/bde/bolt_runner.rb, line 60
def start_bolts
        bolts_config = self.mission_config[ :bolts ] or
                raise "No bolts config section in mission config: %p!" % [ self.mission_config ]
        self.log.info "Loading bolts: %p" % [ bolts_config ]

        return bolts_config.each.with_index do |(bolt_id, config), i|
                self.log.debug "Starting bolt %s: %p" % [ bolt_id, config ]
                bolt_ref = self.start_configured_bolt( bolt_id, config, i )
        end
end
start_configured_bolt( bolt_id, config, index )

Start the bolt described by the given config, which should be a Hash of (at least) the following members:

[:name] the tailored name of the bolt [:text] the text description for bolt notifications, history entries, etc. [:components] a hash of component types and associated config hashes

# File lib/ravn/bde/bolt_runner.rb, line 79
def start_configured_bolt( bolt_id, config, index )
        config = config.transform_keys( &:to_sym )
        type, name, text = config.values_at( :type, :name, :text )
        raise "No type specified for bolt %s: %p" % [ bolt_id, config ] unless type
        raise "No bolt text specified for bolt %s: %p" % [ bolt_id, config ] unless text

        self.log.debug "  tailoring %s%s bolt (%s) with config: %p" % [
                type,
                name ? " - #{name}" : '',
                bolt_id,
                config
        ]
        tailored_class = Ravn::BDE::Bolt.tailor( bolt_id, **config )
        self.log.debug "Tailored class is: %p" % [ tailored_class ]
        state_handle = self.state_manager.ask( get_proxy: bolt_id ) or
                raise "StateProxy manager didn't return a state handle for %s!" % [ bolt_id ]

        path = "%s_%d" % [ tailored_class.path_name, index ]
        self.log.info "Spawning %p [%s]" % [ tailored_class, path ]
        return tailored_class.spawn( path, state_handle )
end