A coordinated workflow contract for the Battlefield Decision Engine.
Emits: - bolt.initiated.{uuid} - bolt.sent.{uuid} - bolt.received.{uuid} - bolt.responded.{uuid} - bolt.summary.{uuid}
Consumes: - bolt.initiate.{uuid} - bolt.send.{uuid} - bolt.sent.{uuid} - bolt.respond.{uuid} - bolt.responded.{uuid}
- DEFAULT_TIMEOUT
The default timeout if none is configured
- PHASE_METHODS
The methods that are triggered by the given events in the execution of the workflow across multiple nodes.
- PLACEHOLDER_PATTERN
A pattern for placeholders in workflow strings that get expanded in various places.
- VALID_NAME
Pattern for matching valid Bolt
name attribute
- VALID_TEXT
Pattern for matching valid Bolt
text attribute
- VALID_TYPE
Pattern for matching valid Bolt
type attribute
- expires_at RW
The Monotonic time at which the bolt will finish via timeout
- start_time R
The monotonic time when this bolt was spawned
add_components( components )
Add the configured components
to the receiving Bolt
.
def self::add_components( components )
components.each do |type, opts|
opts = opts.transform_keys( &:to_sym )
self.log.debug "Finding and applying the %s component with options: %p" %
[ type, opts ]
component = Ravn::BDE::Bolt::Component.find( type )
opts = component.validate( opts )
component.apply( self, **opts )
rescue LoadError => err
raise Ravn::BDE::InvalidBoltConfig, "Unknown component type \"%s\" (%s)" % [ type, err.message ]
rescue Ravn::BDE::ValidationError => err
raise Ravn::BDE::InvalidBoltConfig, err.message
end
end
compile_message_handlers()
Gather message types from each workflow and add the callbacks to the
message_handlers.
def self::compile_message_handlers
self.message_handler_map.each do |message_type_template, handler|
message_type = self.expand_placeholders( message_type_template )
self.log.debug "Installing a message handler: %s (%s): %p" %
[ message_type_template, message_type, handler ]
self.register_message_handler( message_type, handler )
end
end
expand_placeholders( string )
Expand placeholders in the given string
using config, variables, etc.
def self::expand_placeholders( string )
return string.gsub( PLACEHOLDER_PATTERN ) do |_|
name = $~[:placeholder]
case name
when 'bolt_id'
self.id.to_s
when 'bolt_name'
(self.name || self.type).to_s
when 'bolt_type'
self.type.to_s
when 'bolt_text'
self.text.to_s
when 'device_id'
Ravn.device_id
else
raise "No expansion for `%s' in %p" % [ name, string ]
end
end
end
Inheritance hook – add some variables to all +subclass+es.
def self::inherited( subclass )
super
subclass.instance_variable_set( :@message_handler_map, PHASE_METHODS.dup )
end
Return a human-readable version of the class suitable for debugging. This is overridden for tailored (anonymous) subclasses.
def self::inspect
string = super
if self.id
details = "%s%s Bolt {%s}" % [
self.type,
@name ? " - #@name" : '',
self.id
]
string = string.sub( /Class/, details )
end
return string
end
Returns true
if the bolt has a vernacular name in addition to its semantic type.
def self::is_draft?
return @name ? true : false
end
Returns true
if the bolt has been marked as one that gets relayed by gateways.
def self::is_relayed?
return self.ontological_suffix ? true : false
end
The bolt name when it’s set; falls back to the type
.
def self::name
return @name || self.type
end
Create a new bolt instance.
def initialize
self.log.debug "Creating an instance of %p" % [ self ]
@id = Ravn.uuid.generate
@start_time = Ravn.monotonic_time
@expires_at = nil
@initiated = false
super()
end
Return a version of the name of the tailored bolt suitable for use in a path in the actor tree.
def self::path_name
base = self.name || self.type
return base.downcase.gsub( /\P{alnum}/, '_' )
end
segment_for_config( config )
Given a segment config
, return an equivalent Ravn::BDE::Segment
.
def self::segment_for_config( config )
case config
when String, Symbol
config = 'Everyone' if config == :default
segment_name = config.to_s
segment = Ravn::BDE.segments[ segment_name ] or
raise Ravn::BDE::ValidationError, "no such segment %s" % [ config ]
return segment
when NilClass
return Ravn::BDE::Segment.default
else
raise Ravn::BDE::ValidationError, "no such segment %p" % [ config ]
end
end
tailor( id, type:, name: nil, text: "${bolt_name}", timeout: DEFAULT_TIMEOUT, to: nil, components: {}, **options ) { |subclass| ... }
Create a new tailored Bolt
with the given uuid
and name
.
def self::tailor( id, type:, name: nil, text: "${bolt_name}", timeout: DEFAULT_TIMEOUT,
to: nil, components: {}, **options )
self.log.info "Tailoring '%s' bolt: %s (components: %p) with options: %p" %
[ type, id, components, options ]
subclass = Class.new( self )
subclass.instance_variable_set( :@id, id )
subclass.instance_variable_set( :@type, type )
subclass.instance_variable_set( :@name, name )
subclass.instance_variable_set( :@text, text )
subclass.instance_variable_set( :@components, components )
subclass.instance_variable_set( :@timeout, timeout )
to = self.segment_for_config( to )
subclass.instance_variable_set( :@to, to )
subclass.instance_variable_set( :@ontological_suffix, options[:ontological_suffix] )
yield( subclass ) if block_given?
subclass.add_components( components )
subclass.compile_message_handlers
return subclass
end
Validate the specified bolt config
, raaising a Ravn::BDE::InvalidBoltConfig
if it isn’t valid.
def self::validate( config )
type = config[:type] or raise Ravn::BDE::InvalidBoltConfig, "missing `type' attribute"
raise Ravn::BDE::InvalidBoltConfig, "invalid `type' attribute" unless
type.match?( VALID_TYPE )
name = config[:name]
raise Ravn::BDE::InvalidBoltConfig, "invalid `name' attribute" if
name && !name.match?( VALID_NAME )
text = config[:text]
raise Ravn::BDE::InvalidBoltConfig, "invalid `text' attribute" if
text && !text.match?( VALID_TEXT )
timeout = config[:timeout]
raise Ravn::BDE::InvalidBoltConfig, "invalid `timeout` attribute" if
timeout && ( !timeout.is_a?( Numeric ) || timeout < 0 )
components = config[:components] || {}
unless components.is_a?( Hash )
raise Ravn::BDE::InvalidBoltConfig,
"invalid `components'; expected a Hash, got %p" % [ components.class ]
end
return true
end
The component configuration (as a Hash) that has been applied to the tailored Bolt
singleton_attr_reader :components
event_payload_for( phase, originating_event )
Return the ‘data’ section for the bolt message emitted during the given phase
.
def event_payload_for( phase, originating_event )
payload = self.fields
case phase
when :initiate
payload[ :to ] = self.to.name
when :send
payload[ :to ] = self.to.members.to_a - [Ravn::BDE.callsign]
when :respond, :receive
payload[ :instance_id ] = originating_event.data[ :instance_id ] or
raise "Originating event %s is missing instance_id" % [ originating_event.id ]
end
return payload
end
expects_response_from_me?()
Returns true if the current user is expected to respond
def expects_response_from_me?
return self.class.responses_from.include?( Ravn::BDE.callsign ) || nil
end
Ravn::Message API – return a Hash of fields for this Bolt
when serialized.
def fields
return {
instance_id: self.id,
type: self.type,
name: self.name,
text: self.text,
is_relayed: self.class.is_relayed?,
ontological_suffix: self.class.ontological_suffix,
}
end
Handle the heartbeat event sent by the runner to expire bolts that have timed out.
def handle_heartbeat( * )
self.tell( timeout: true ) if self.timed_out?
end
Event handler – called when the Bolt
has timed out while executing.
def handle_timeout( * )
self.log.warn "%p timed out." % [ self ]
self.finish_bolt
end
The unique identifier of a tailored Bolt
singleton_attr_reader :id
Initiate a new run of the Bolt’s workflow
def initiate_phase( event )
self.log.warn "Initiated with: %p" % [ event ]
if self.initiated?
self.log.warn "Initiated again before finishing; restarting a new instance"
return self.finish_bolt( event )
else
self.initiated = true
payload = self.event_payload_for( :initiate, event )
self.send_correlated_message( "bolt.initiated.${bolt_id}", event, data: payload )
end
end
True if the bolt has been initiated.
attr_predicate_accessor :initiated?
Returns true
if the given event
is addressed to the receiving bolt.
def is_target_of?( event )
return case event
when Ravn::Net::Message
event.callsign && event.data[:instance_id] == self.id
when Ravn::HAL::Message
event.data[:instance_id] == self.id
else
false
end
end
The map of events to the methods they invoke
singleton_attr_reader :message_handler_map
The subtype of the bolt when emitted up and out
singleton_attr_reader :ontological_suffix
Returns true
if the bolt state is not expected to change further. This should be overridden by components that will add more state in later phases.
def ready_to_be_finished?
return true
end
Start the workflow from an initiating BDE, showing a notification on the display(s).
def receive_phase( event )
self.log.info "Received: %p" % [ event ]
recipients = event.to || []
if recipients.empty? || recipients.include?( Ravn::BDE.callsign )
payload = self.event_payload_for( :receive, event )
self.send_correlated_message( 'bolt.received.${bolt_id}', event, data: payload )
end
end
If the message has a response, forward it across the network to the other BDEs.
def respond_phase( event )
self.log.info "Responding with: %p" % [ event ]
payload = self.event_payload_for( :respond, event )
self.send_correlated_message( 'bolt.responded.${bolt_id}', event, data: payload )
end
Upon confirmation by the watch, start the workflow on the other BDEs
def send_phase( event )
self.log.info "Sent with: %p" % [ event ]
payload = self.event_payload_for( :send, event )
self.send_correlated_message( 'bolt.sent.${bolt_id}', event, data: payload )
self.expires_at = Ravn.monotonic_time + self.class.timeout
self.finish_bolt if self.ready_to_be_finished?
end
Gather responses and show a summary of them on the display(s).
def summarize_phase( event )
if self.is_target_of?( event )
self.log.info "Summary updated with: %p" % [ event ]
payload = self.event_payload_for( :summarize, event )
self.send_correlated_message( 'bolt.summary.${bolt_id}', event, data: payload )
self.finish_bolt if self.ready_to_be_finished?
end
end
The user-facing content of the bolt
singleton_attr_reader :text
Return true if the bolt has timed out.
def timed_out?
return self.expires_at && Ravn.monotonic_time > self.expires_at
end
The user-configured timeout
singleton_attr_reader :timeout
The segment that describes the callsigns the message is sent to
singleton_attr_reader :to
The semantic type of the tailored Bolt
singleton_attr_reader :type
Protected Instance Methods
finish_bolt( forwarded_event=nil )
Complete this bolt’s execution lifetime, saving it to bolt history and resetting state.
def finish_bolt( forwarded_event=nil )
self.log.warn "Finishing bolt %s%s" % [
self.id,
forwarded_event ? " with forwarded event: %p" % [ forwarded_event ] : ''
]
self.parent.tell( finish_bolt: [self, forwarded_event] )
end
Ravn::Inspection API: Return the detail part of the inspect output.
def inspect_details
earlier_details = super if defined?( super )
return earlier_details ? " #{earlier_details}" : ''
end
send_message( type, fields )
Create a send a message of the given type
with the specified fields
.
def send_message( type, fields )
type = self.class.expand_placeholders( type )
message = Ravn::BDE::Message.new( type, fields )
self.log.info "Sending bolt message: %p" % [ message ]
self.filter_up( message )
return message
end