Ravn::Net::Gateway::
Db
class
The demo gateway node for up-and-out C2 events that directly inserts events into a PostgreSQL database instead of relaying them through a broker.
This node consumes: - net.gps.position - sys.gps.position - bolt.finished.*
It doesn’t emit any events.
- db R
The Sequel database connection
- mission_name R
The name of the current mission
- table R
The dataset for the table to insert into
- uri R
The URI of the database to connect to on startup
Create a new (unstarted) PgGateway
def initialize( * )
@uri = self.class.uri
@mission_name = Ravn::BDE.mission[:name]
@db = nil
@table = nil
@cursor = nil
Sequel.extension( :core_refinements )
Sequel.extension( :pg_json )
Sequel.extension( :pg_json_ops )
super
end
send_messages( *messages )
def send_messages( *messages )
messages.each do |message|
self.log.warn "Gatewaying: %p" % [ message ]
message.log_audit_trail
global_type, payload = Ravn::Ontology.globalize( message )
payload[:data] = Sequel.pg_jsonb( payload[:data] )
payload[:time] = Time.at( payload[:time] )
payload[:operation] ||= 'demo'
payload[:callsign] ||= 'DEMO'
self.log.warn " translated to a %p message: %p" % [ global_type, payload ]
self.publish( global_type, payload )
end
end
Start trying to connect to the exchange to relay events.
def start
self.log.info "Starting database gateway; inserting into %s." % [ self.uri ]
@db = Sequel.connect( self.uri )
@db.extension( :pg_json )
@table = @db[ self.class.table_name ]
end
Protected Instance Methods
Ravn::Inspection API — return the details part of inspect output.
def inspect_details
return "connected to: %s" % [ self.uri ]
end
Publish an event.
def publish( type, payload )
self.log.debug "Inserting a '%s' (%p): %s" % [ type, payload, self.table.insert_sql(payload) ]
self.table.insert( payload )
self.db.notify( :new_event )
rescue => err
self.log.error "Unrecoverable %p when inserting an event: %s." % [ err.class, err.message ]
err.backtrace.each do |frame|
self.log.debug( frame )
end
end