Ravn::Net::Gateway::

C2 class

The gateway node for up-and-out C2 events. This selects events, transforms them into global event types, and queues them up for transmission when communications are established.

It will eventually unpack them the other way for down-and-in events from C2 elements.

Refs: - rubybunny.info - www.rabbitmq.com/documentation.html

This node consumes: - net.gps.position - sys.gps.position - bolt.finished.*

Constants

DEFAULT_PUBLISH_OPTIONS

Default options for publication

PUBLICATION_INTERVAL

How many seconds to try to publish

RECONNECT_ERROR_TYPES

Types of errors that cause a retry when publishing

Attributes

queue R

The queue of outgoing messages that will be sent when the connection is up

timer R

The Concurrent::TimerTask that periodically attempts to publish any queued messages.

uri R

The URI of the exchange to relay through

Public Class Methods

new( * )

Create a new (unstarted) C2Gateway

# File lib/ravn/net/gateway/c2.rb, line 98
def initialize( * )
        @uri      = self.class.broker_uri

        @conn     = Bunny.new( @uri, verify_peer: self.class.verify_peer )
        @channel  = nil
        @exchange = nil

        @queue    = []
        @timer    = self.make_publication_timer

        super
end

Public Instance Methods

handle_publish( * )

Actor event handler – attempt to publish any events that are in the queue.

# File lib/ravn/net/gateway/c2.rb, line 173
def handle_publish( * )
        until self.queue.empty?
                entry = self.queue.first

                break unless self.publish_payload( *entry )

                self.log.debug { "Successfully published %p" % [entry] }
                self.queue.shift
        end
end
messages_queued?()

Returns a truthy value if there are messages waiting to be sent.

# File lib/ravn/net/gateway/c2.rb, line 139
def messages_queued?
        return ! self.queue.empty?
end
relay_bolt_message( message )
# File lib/ravn/net/gateway/c2.rb, line 146
def relay_bolt_message( message )
        bolt = message.data[ :bolt ] or return
        unless bolt[ :is_relayed ]
                self.log.info "Bolt %s is not configured to be relayed." % [ bolt[:id] ]
                return
        end

        return self.relay_message( message )
end
relay_message( message )
# File lib/ravn/net/gateway/c2.rb, line 159
def relay_message( message )
        self.log.debug "Sending a %s message" % [ message.type ]
        eventname, eventid, payload, headers = Ravn::Net::GlobalProtocol.encode( message )

        unless eventname
                self.log.warn "Ignoring message: no event name"
                return
        end

        self.enqueue( eventname, eventid, payload, headers )
end
start()

Start trying to connect to the exchange to relay events.

# File lib/ravn/net/gateway/c2.rb, line 127
def start
        self.timer.execute
end
started?()

Returns true if the gateway has been started.

# File lib/ravn/net/gateway/c2.rb, line 133
def started?
        return self.timer.running?
end

Protected Instance Methods

channel()

Return a Bunny::Channel for the current thread.

# File lib/ravn/net/gateway/c2.rb, line 231
def channel
        session = self.conn or return nil

        return @channel ||= begin
                self.log.info "Creating AMQP channel"
                ch = session.create_channel

                # Enable publisher confirmation
                ch.confirm_select

                ch
        end
end
conn()

The AMQP connection the gateway uses to relay events (a Bunny::Session object). Returns nil in the case where the session can’t be started.

# File lib/ravn/net/gateway/c2.rb, line 206
def conn
        return @conn if @conn.open?

        begin
                @channel = nil
                @exchange = nil

                self.log.info "Starting C2 gateway; relaying through %p." % [ @conn ]
                @conn.start

                self.log.info "Connected to %s v%s server: %s" % [
                        self.conn.server_properties['product'],
                        self.conn.server_properties['version'],
                        capabilities_list( self.conn.server_properties['capabilities'] ),
                ]

                return @conn
        rescue Bunny::Exception => err
                self.log.warn "%p while starting up: %s" % [ err.class, err.message ]
                return nil
        end
end
enqueue( eventname, eventid, payload, opts={} )

Add an entry to the queue for the given eventname, eventid, payload and opts.

# File lib/ravn/net/gateway/c2.rb, line 271
def enqueue( eventname, eventid, payload, opts={} )
        opts = DEFAULT_PUBLISH_OPTIONS.merge( opts )
        opts.merge!(
                routing_key: eventname,
                content_encoding: payload.encoding.name,
                timestamp: Time.now.to_f,
                message_id: eventid,
        )

        self.log.debug "Queueing a new %s message." % [ eventname ]
        self.queue << [ payload, opts ].freeze

        self.tell( [:publish] )
end
exchange()

Return a Bunny::Exchange for the current thread.

# File lib/ravn/net/gateway/c2.rb, line 247
def exchange
        channel = self.channel or return nil

        return @exchange ||= begin
                self.log.info "Creating AMQP exchange %s" % [ self.class.exchange ]

                ex = channel.topic( self.class.exchange, passive: true )
                ex.on_return( &self.method(:handle_unpublished_messages) )

                ex
        end
end
handle_missed_messages( nacked_set )

Handle any messages that were missed.

# File lib/ravn/net/gateway/c2.rb, line 308
def handle_missed_messages( nacked_set )
        # :TODO: Publish an event to the local bus describing the nacked_set
        nacked_set.each do |msgid|
                self.log.error "Message %d got NACK." % [ msgid ]
        end
end
handle_unpublished_messages( return_info, properties, payload )

Handle messages that would have been dropped by the broker.

# File lib/ravn/net/gateway/c2.rb, line 317
def handle_unpublished_messages( return_info, properties, payload )
        # :TODO: Queue, aggregate, publish an event to the local bus
        self.log.debug "Upstream dropped message: %p, %p, %p" % [ return_info, properties, payload ]
end
inspect_details()

Return the details of inspect output.

# File lib/ravn/net/gateway/c2.rb, line 324
def inspect_details
        uri = Bunny::Session.parse_uri( self.class.broker_uri )

        return " v%p messages -> %s://%s:%d%s [%s]" % [
                Ravn::Net::GlobalProtocol::VERSION,
                uri[:scheme],
                uri[:host],
                uri[:port],
                uri[:vhost],
                self.class.exchange,
        ]
end
make_publication_timer()

Make a TimerTask that will call publish periodically.

# File lib/ravn/net/gateway/c2.rb, line 195
def make_publication_timer
        timer = Concurrent::TimerTask.new { self.tell([:publish]) }
        timer.execution_interval = PUBLICATION_INTERVAL
        timer.add_observer( Ravn::LoggingTaskObserver.new(:publication_timer) )

        return timer
end
publish_payload( payload, options )

Send the specified payload to AMQP with the given options.

# File lib/ravn/net/gateway/c2.rb, line 288
def publish_payload( payload, options )
        channel = self.channel or return false
        exchange = self.exchange or return false

        self.log.debug "Publishing to AMQP exchange '%s'" % [ exchange.name ]

        exchange.publish( payload, options )
        channel.wait_for_confirms or self.handle_missed_messages( channel.nacked_set )

        # :TODO: Publish an event to the local bus describing the acked_set

        return true

rescue *RECONNECT_ERROR_TYPES => err
        self.reset
        return false
end
reset()

Reset the AMQP connection, forcing the gateway to attempt to reconnect.

# File lib/ravn/net/gateway/c2.rb, line 262
def reset
        self.log.warn "Resetting channel and exchange."
        @exchange = nil
        @channel.close if @channel&.open?
        @channel = nil
end