Ravn::Net::Gateway::
C2
class
The gateway node for up-and-out C2
events. This selects events, transforms them into global event types, and queues them up for transmission when communications are established.
It will eventually unpack them the other way for down-and-in events from C2
elements.
Refs: - rubybunny.info - www.rabbitmq.com/documentation.html
This node consumes: - net.gps.position - sys.gps.position - bolt.finished.*
- DEFAULT_PUBLISH_OPTIONS
Default options for publication
- PUBLICATION_INTERVAL
How many seconds to try to publish
- RECONNECT_ERROR_TYPES
Types of errors that cause a retry when publishing
- queue R
The queue of outgoing messages that will be sent when the connection is up
- timer R
The Concurrent::TimerTask that periodically attempts to publish any queued messages.
- uri R
The URI of the exchange to relay through
Create a new (unstarted) C2Gateway
def initialize( * )
@uri = self.class.broker_uri
@conn = Bunny.new( @uri, verify_peer: self.class.verify_peer )
@channel = nil
@exchange = nil
@queue = []
@timer = self.make_publication_timer
super
end
Actor event handler – attempt to publish any events that are in the queue.
def handle_publish( * )
until self.queue.empty?
entry = self.queue.first
break unless self.publish_payload( *entry )
self.log.debug { "Successfully published %p" % [entry] }
self.queue.shift
end
end
Returns a truthy value if there are messages waiting to be sent.
def messages_queued?
return ! self.queue.empty?
end
relay_bolt_message( message )
def relay_bolt_message( message )
bolt = message.data[ :bolt ] or return
unless bolt[ :is_relayed ]
self.log.info "Bolt %s is not configured to be relayed." % [ bolt[:id] ]
return
end
return self.relay_message( message )
end
def relay_message( message )
self.log.debug "Sending a %s message" % [ message.type ]
eventname, eventid, payload, headers = Ravn::Net::GlobalProtocol.encode( message )
unless eventname
self.log.warn "Ignoring message: no event name"
return
end
self.enqueue( eventname, eventid, payload, headers )
end
Start trying to connect to the exchange to relay events.
def start
self.timer.execute
end
Returns true
if the gateway has been started.
def started?
return self.timer.running?
end
Protected Instance Methods
Return a Bunny::Channel for the current thread.
def channel
session = self.conn or return nil
return @channel ||= begin
self.log.info "Creating AMQP channel"
ch = session.create_channel
ch.confirm_select
ch
end
end
The AMQP connection the gateway uses to relay events (a Bunny::Session object). Returns nil
in the case where the session can’t be started.
def conn
return @conn if @conn.open?
begin
@channel = nil
@exchange = nil
self.log.info "Starting C2 gateway; relaying through %p." % [ @conn ]
@conn.start
self.log.info "Connected to %s v%s server: %s" % [
self.conn.server_properties['product'],
self.conn.server_properties['version'],
capabilities_list( self.conn.server_properties['capabilities'] ),
]
return @conn
rescue Bunny::Exception => err
self.log.warn "%p while starting up: %s" % [ err.class, err.message ]
return nil
end
end
enqueue( eventname, eventid, payload, opts={} )
Add an entry to the queue for the given eventname
, eventid
, payload
and opts
.
def enqueue( eventname, eventid, payload, opts={} )
opts = DEFAULT_PUBLISH_OPTIONS.merge( opts )
opts.merge!(
routing_key: eventname,
content_encoding: payload.encoding.name,
timestamp: Time.now.to_f,
message_id: eventid,
)
self.log.debug "Queueing a new %s message." % [ eventname ]
self.queue << [ payload, opts ].freeze
self.tell( [:publish] )
end
Return a Bunny::Exchange for the current thread.
def exchange
channel = self.channel or return nil
return @exchange ||= begin
self.log.info "Creating AMQP exchange %s" % [ self.class.exchange ]
ex = channel.topic( self.class.exchange, passive: true )
ex.on_return( &self.method(:handle_unpublished_messages) )
ex
end
end
handle_missed_messages( nacked_set )
Handle any messages that were missed.
def handle_missed_messages( nacked_set )
nacked_set.each do |msgid|
self.log.error "Message %d got NACK." % [ msgid ]
end
end
handle_unpublished_messages( return_info, properties, payload )
Handle messages that would have been dropped by the broker.
def handle_unpublished_messages( return_info, properties, payload )
self.log.debug "Upstream dropped message: %p, %p, %p" % [ return_info, properties, payload ]
end
Return the details of inspect output.
def inspect_details
uri = Bunny::Session.parse_uri( self.class.broker_uri )
return " v%p messages -> %s://%s:%d%s [%s]" % [
Ravn::Net::GlobalProtocol::VERSION,
uri[:scheme],
uri[:host],
uri[:port],
uri[:vhost],
self.class.exchange,
]
end
Make a TimerTask that will call publish periodically.
def make_publication_timer
timer = Concurrent::TimerTask.new { self.tell([:publish]) }
timer.execution_interval = PUBLICATION_INTERVAL
timer.add_observer( Ravn::LoggingTaskObserver.new(:publication_timer) )
return timer
end
publish_payload( payload, options )
Send the specified payload
to AMQP with the given options
.
def publish_payload( payload, options )
channel = self.channel or return false
exchange = self.exchange or return false
self.log.debug "Publishing to AMQP exchange '%s'" % [ exchange.name ]
exchange.publish( payload, options )
channel.wait_for_confirms or self.handle_missed_messages( channel.nacked_set )
return true
rescue *RECONNECT_ERROR_TYPES => err
self.reset
return false
end
Reset the AMQP connection, forcing the gateway to attempt to reconnect.
def reset
self.log.warn "Resetting channel and exchange."
@exchange = nil
@channel.close if @channel&.open?
@channel = nil
end