Ravn::BDE::
StateManager
class
Maintain state of active (interactive) bolts on disk, and write “finished” bolts to a historical database.
- DESERIALIZERS
De-serializer procs for MDBX
- SERIALIZERS
Serializer procs for MDBX
- state R
The MDBX::Database state database handle.
Create a new StateManager
.
def initialize
@state = nil
super
end
Remove the MDBX database from disk.
def self::reset
self.log.warn "Purging state database at: %s" % [ Ravn::BDE.state_db_path ]
Ravn::BDE.state_db_path.rmtree
end
emit_finished_event( bolt, final_bolt_state )
Emit ‘bolt.finished’ event for the given bolt
.
def emit_finished_event( bolt, final_bolt_state )
data = {
bolt: bolt.fields,
final_state: final_bolt_state,
instance_id: bolt.id
}
type = bolt.class.expand_placeholders( 'bolt.finished.${bolt_id}' )
event = Ravn::BDE::Message.new( type,
data: data,
opex: Ravn::BDE.mission[ :name ]
)
self.log.warn "Emitting a finished event: %p" % [ event ]
self.filter_up( event )
end
handle_fetch( bolt_id, key )
Handler for fetching the value for the given key
.
def handle_fetch( bolt_id, key )
self.state.collection( bolt_id ) do
return self.state[ key ]
end
end
handle_finish_bolt( bolt )
Finish up the given bolt
and prep states for a new instance.
def handle_finish_bolt( bolt )
self.log.info "Finishing %s bolt %s (%0.3f)" % [ bolt.name, bolt.id, bolt.start_time ]
final_bolt_state = self.save_bolt_state( bolt )
self.emit_finished_event( bolt, final_bolt_state )
end
handle_get_proxy( bolt_id )
Fetch a StateProxy object that can be used to get and set values for the bolt with the given bolt_id
.
def handle_get_proxy( bolt_id )
raise "StateManager hasn't been started yet." unless self.started?
return Ravn::BDE::StateProxy.new( self.reference, bolt_id )
end
handle_store( bolt_id, key, val )
Handler for storing a key value pair for the given bolt_id
.
def handle_store( bolt_id, key, val )
self.state.collection( bolt_id ) do
return self.state[ key ] = val
end
end
Return the details of inspect output.
def inspect_details
base = super
return "%s %s" % [ base, File.expand_path(self.state.path) ]
end
Handle lifecycle events of the manager.
def on_event( event )
event_name, _ = event
case event_name
when Exception, :paused, :terminated, :resetting, :restarting
self.stop
else
self.log.warn "Unhandled event %p" % [ event_name ]
end
end
Purge bolt state from runtime storage and move it into history.
def save_bolt_state( bolt )
serialized_state = nil
self.state.collection( bolt.class.id ) do |bolt_state|
bolt_state.snapshot do
default_state = bolt.class.stateful_variable_defaults
serialized_state = default_state.transform_keys( &:to_s ).merge( bolt_state.to_h )
Ravn::BDE::MissionRecord::Bolt.create(
bolt_id: bolt.class.id,
instance_id: bolt.id,
state: serialized_state
)
end
end
self.log.warn "Dropping state for %s bolt [%s]" % [ bolt.class.name, bolt.class.id ]
self.state.drop( bolt.class.id )
self.state.collection( bolt.class.id ) do
self.log.debug "Desc should be nil, was: %p" % self.state[ :description ]
end
return serialized_state
end
Start the manager. If the state path is absolute, use it directly. Otherwise, it is relative to the mission directory.
def start
self.init_state_db
self.log.info "Starting! Opening persistant state store."
self.log.debug "Persisted states set to %p" % [ self.state ]
super
end
Returns true
if the manager has been started.
def started?
return self.state && ! self.state.closed?
end
Perform any necessary cleanup routines.
def stop
self.state&.close
end
Protected Instance Methods
Startup actions to initialize the state database.
def init_state_db
dbpath = Ravn::BDE.state_db_path
dbpath.dirname.mkpath
self.log.info "Opening state DB: %s" % [ dbpath ]
@state = MDBX::Database.open(
dbpath.to_s,
max_collections: self.class.max_collections
)
self.log.info "Using the %p serializer." % [ self.class.serializer ]
@state.serializer = SERIALIZERS[ self.class.serializer ]
@state.deserializer = DESERIALIZERS[ self.class.serializer ]
end