This is a Ruby interface to the Silvus Streamcaster 4000 radio.
It does not currently expose the entire set of API
calls, instead limiting to the most immediately useful. It currently operates on the assumption of private networks, omitting authentication and using http instead of https.
- CRYPTO_PROFILES
Encryption profile algorithm map.
- DEFAULT_TIMEOUT
Maximum time to wait for a radio response.
- RESPONSE_PARSER
A method -> response parser map, for munging result data into ruby-ish values per API
call.
- ROUTING_PROTOCOLS
Valid routing protocols.
- SKIP_FLASH_WRITE
API
calls that should never have a followup save to flash.
- connection R
A persistent Net::HTTP connection.
- endpoint R
The API
endpoint URI for a specific radio.
- ip R
This radio’s ip address.
- label RW
An arbitrary label for identification.
Instantiate a new Ravn::Silvus
object given a radio at host
IP address.
def initialize( ip )
@ip = ip.to_s
@endpoint = URI( '' )
@label = self.ip
@batching = false
@batch = {}
self.setup_connection
end
Return an array of all known node identifiers participating in the current mesh network.
def all_nodes
return self.network_status.each_with_object( Set.new ) do |link, acc|
acc << link[ :node_1 ]
acc << link[ :node_2 ]
end.to_a.sort
end
Get/set the mesh bandwidth in MHz. All radios must be using the same bandwidth to function.
def bandwidth( bw=nil )
bw = Array( bw.to_s ) if bw
return self.queue( :bw, bw, label: __method__ )
end
Batch commands together while within the block. Returns a hash of results, keyed by method name.
def batch( &block )
@batching = true
yield( self )
result = self.send_request
return result
ensure
@batching = false
@batch.clear
end
Return the current battery charge, if the radio model supports it.
def battery_level
return self.queue( :battery_percent, nil, label: __method__ )
end
Massage a batched message into a deferred update.
NOTE: As of 2024/01, this is undocumented by the Silvus API
, and is emulated from network captures of the web interface. (This is using a completely different endpoint path, outside of the regular Silvus API
.)
def defer( &block )
nodes = self.all_nodes
@batching = true
yield( self )
apis = @batch.values.each_with_object( [] ) do |cmd, acc|
meth, params = cmd[ :payload ].values_at( :method, :params )
acc << { method: meth, params: params }
end
payload = {
apis: [{
method: 'deferred_execution_api',
params: {
version: "1",
sleep: "3",
api_list: apis
}
}],
nodeids: nodes
}
result = self.send_request( payload: payload, path: '/bcast_enc.pyc' )
return result
ensure
@batching = false
@batch.clear
end
dhcp_on_mesh( enable=nil )
Enable or disable DHCP packets from this radio travelling across the mesh - effectively making this radio serve clients behind siblings.
Note: This is currently undocumented in the Silvus API
.
def dhcp_on_mesh( enable=nil )
param = case enable
when false
[ "1" ]
when true
[ "0" ]
else
nil
end
return self.queue( :mesh_dhcp_block_enable, param, label: __method__ )
end
dhcp_settings( params=nil )
Get or set the virtual network DHCP settings. When setting values, all parameters must be present to affect change. Parameters are currently:
dhcp: “on” or “off” dhcp_start: The IP address begin range for clients dhcp_end: The IP address end range for clients dhcp_subnet: The network subnet provided to clients dhcp_router: The gateway address provided to clients (usually this radio’s virtual IP address) lease_time: How often IPs are put back into the pool
Note: None of this is currently documented in the Silvus API
.
def dhcp_settings( params=nil )
if params
params.transform_keys!( &:to_s )
params.transform_values!( &:to_s )
required = %w[ dhcp dhcp_router dhcp_subnet dhcp_start dhcp_end lease_time ]
missing = required - params.keys
unless missing.empty?
raise InputError, "Unable to set dhcp settings, missing: %p" % [ missing ]
end
end
return self.queue( __method__, params )
end
Get/set the mesh maximum distance between units in meters. All radios must be using the same distance to function.
def distance( meters=nil )
meters = Array( meters.to_s ) if meters
return self.queue( :max_link_distance, meters, label: __method__ )
end
Emit the configuration values we’re concerned with.
def dump_config
config = self.batch do
self.virtual_network
self.virtual_ip_address
self.virtual_ip_gateway
self.virtual_ip_netmask
self.dhcp_settings
self.dhcp_on_mesh
self.mesh_network
self.bandwidth
self.frequency
self.routing_protocol
self.encryption
self.encryption_algorithm
self.encryption_key
self.encryption_hmac_key
self.encryption_wrap_key
self.encryption_rfauth_key
self.encryption_rfbcast_key
end
config[ :virtual_ip_address ] = config[ :virtual_ip_address ].to_s
config[ :virtual_ip_gateway ] = config[ :virtual_ip_gateway ].to_s
return config
end
Enable/disable encrypted communication across the mesh.
def encryption( enable=nil )
param = case enable
when false
[ "1" ]
when true
[ "0" ]
else
nil
end
rv = self.queue( :enc_disable, param, label: __method__ )
if param
self.queue( :enc_disable_confirm, param, label: __method__ )
end
return rv
end
encryption_algorithm( algorithm=nil, key_length=256 )
Get/set the current encryption in use.
def encryption_algorithm( algorithm=nil, key_length=256 )
params = nil
if algorithm
algos = CRYPTO_PROFILES.invert
unless algos.keys.include?( algorithm )
raise InputError, "Unknown algorithm. Should be one of: %p" % [ algos.keys ]
end
params = [ algos[ algorithm ].to_s, key_length.to_s ]
end
return self.queue( :enc_profile, params, label: __method__ )
end
encryption_hmac_key( key=nil )
Get/set the encryption HMAC key for use across the mesh (AES-GCM).
def encryption_hmac_key( key=nil )
key = Array( key ) if key
return self.queue( :enc_hmac_key, key, label: __method__ )
end
encryption_key( key=nil )
Get/set the encryption key for use across the mesh (DES). This will be automatically truncated to the first 32 characters.
def encryption_key( key=nil )
key = Array( key[0..31] ) if key
return self.queue( :enc_key, key, label: __method__ )
end
encryption_rfauth_key( key=nil )
Get/set the encryption unicast key for use across the mesh (AES-GCM).
def encryption_rfauth_key( key=nil )
key = Array( key ) if key
return self.queue( :enc_rf_auth_key, key, label: __method__ )
end
encryption_rfbcast_key( key=nil )
Get/set the encryption broadcast key for use across the mesh (AES-GCM).
def encryption_rfbcast_key( key=nil )
key = Array( key ) if key
return self.queue( :enc_rf_bcast_auth_key, key, label: __method__ )
end
encryption_wrap_key( key=nil )
Get/set the encryption wrap key for use across the mesh (AES-GCM).
def encryption_wrap_key( key=nil )
key = Array( key ) if key
return self.queue( :enc_wrap_key, key, label: __method__ )
end
freq_bandwidth( freq, bw )
Set frequency and bandwidth in one hit. This doesn’t feel like it should be necessary, but the lag for setting them individually is quite high.
def freq_bandwidth( freq, bw )
return self.queue( :freq_bw, [ freq.to_s, bw.to_s ], label: __method__ )
end
Get/set the mesh frequency in MHz. All radios must be on the same frequency to function.
def frequency( freq=nil )
freq = Array( freq.to_s ) if freq
return self.queue( :freq, freq, label: __method__ )
end
Fetch the radio’s current latitude, longitude, and altitude. Returns a labeled struct.
def gps_coordinates
return self.queue( __method__ )
end
Returns the accuracy of the GPS antenna, one of
:none -> no antenna or no satellites found :basic -> 10m accuracy, no altitude :full -> 3m accuracy w/ altitude
def gps_lock_level
return self.queue( :gps_mode, nil, label: __method__ )
end
Returns the time as seen by GPS, or nil
if there is no current PPS signal.
def gps_time
return self.queue( __method__ )
end
Predicate: Is this radio connected to others?
def mesh_connected?
return ! self.network_status.empty?
end
Get or set the mesh network name. All participating radios need to have the same name.
def mesh_network( name=nil )
name = Array( name ) if name
return self.queue( :nw_name, name, label: __method__ )
end
Returns the complete list of all wireless links this radio knows about, in relation to the rest of the network.
def network_status
return self.queue( __method__ )
end
Return the radio’s 16bit unique node id.
def nodeid
return self.queue( __method__ )
end
Immediately reboot the radio.
def reset
return self.queue( :radio_reset, nil, label: __method__ )
end
routing_protocol( proto=nil )
Get/set the radio’s routing protocol, either ‘legacy’ or ‘path_vector’, for large network. This requires a recent SS4 firmware.
def routing_protocol( proto=nil )
if proto
unless ROUTING_PROTOCOLS.include?( proto.to_sym )
raise InputError, "Unknown protocol. Should be one of: %p" % [ ROUTING_PROTOCOLS ]
end
proto = Array( proto )
end
return self.queue( :routing_proto, proto, label: __method__ )
end
Retrieve common radio runtime values.
def status
stat = self.batch do
self.version
self.nodeid
self.battery_level
self.network_status
self.mesh_network
self.gps_lock_level
self.gps_coordinates
self.gps_time
self.temperature
end
stat.merge!( mesh_connected: ! stat[ :network_status ].empty? )
return stat
end
Return the current radio temperature in Fahrenheit.
def temperature
return self.queue( :read_current_temperature, nil, label: __method__ )
end
Returns the radio API
version.
def version
return self.queue( __method__ )
end
virtual_ip_address( ip=nil )
Set/get this radio’s IP address on the virtual network.
def virtual_ip_address( ip=nil )
ip = Array( ip.to_s ) if ip
return self.queue( __method__, ip )
end
virtual_ip_gateway( ip=nil )
Set/get this radio’s virtual network gateway.
def virtual_ip_gateway( ip=nil )
ip = Array( ip.to_s ) if ip
return self.queue( __method__, ip )
end
virtual_ip_netmask( mask=nil )
Set/get netmask for the virtual network.
def virtual_ip_netmask( mask=nil )
mask = Array( mask.to_s ) if mask
return self.queue( __method__, mask )
end
virtual_network( enable=nil )
Enable or disable the virtual IP network across the mesh.
def virtual_network( enable=nil )
param = case enable
when false
[ "1" ]
when true
[ "0" ]
else
nil
end
return self.queue( :virtual_ip_disable, param, label: __method__ )
end
write_flash( setting=nil )
Writes the setting
argument to the radio’s permanent flash. Depending on the radio model and firmware, different settings require this - without a specific setting, this saves all current values.
def write_flash( setting=nil )
param = Array( setting.to_s ) if setting
setting ||= "all"
return if SKIP_FLASH_WRITE.include?( setting.to_sym )
return self.queue( :setenvlinsingle, param, label: "save_#{setting}".to_sym )
end
Protected Instance Methods
Return a formatted string for JSON RPC error messages.
def check_error( rpc )
return unless rpc[ 'error' ]
err = "%s (%s): %s" % [
@batch.dig( rpc['id'], :command ),
rpc.dig( 'error', 'code' ),
rpc.dig( 'error', 'message' )
]
self.log.error "Error for command: %p" % [ @batch[rpc['id']] ]
raise GenericError, err if err
end
For a single RPC reply, find the lambda responsible for providing a ruby-ish result.
def parse_jsonrpc( rpc )
label = @batch.dig( rpc['id'], :label )
result = rpc[ 'result' ]
meth = RESPONSE_PARSER[ label ] || RESPONSE_PARSER[ :__default ]
return meth.call( result )
end
parse_response( response )
Given a Net::HTTP response
, return the values after any necessary conversion(s).
def parse_response( response )
payload = JSON.parse( response.body )
unless @batching
rv = payload.is_a?( Array ) ? payload.first : payload
self.check_error( rv )
return self.parse_jsonrpc( rv )
end
return payload.each_with_object( {} ) do |rv, acc|
next unless rv.is_a?( Hash )
self.check_error( rv )
command = @batch.dig( rv['id'], :label )
acc[ command ] = self.parse_jsonrpc( rv )
end
ensure
@batch.clear
end
queue( command, params=nil, label: nil )
Add a wrapped json-rpc string for the command
to the queue.
def queue( command, params=nil, label: nil )
req_id = SecureRandom.uuid
@batch[ req_id ] = {
command: command,
label: label || command,
}
payload = {
jsonrpc: '2.0',
method: command,
id: req_id
}
payload.merge!( params: params ) if params
@batch[ req_id ].merge!( payload: payload )
if @batching
self.write_flash( command ) if params
return
end
rv = self.send_request
self.write_flash( command ) if params
return rv
end
send_request( payload: nil, path: nil )
Ship the current batch payload to the radio, returning parsed results.
def send_request( payload: nil, path: nil )
req = Net::HTTP::Post.new( path || self.endpoint.path )
req.set_content_type( 'application/json' )
payload ||= @batch.values.map{|c| c[:payload] }
req.body = JSON.generate( payload )
self.connection.start unless self.connection.started?
response = self.connection.request( req )
return self.parse_response( response )
rescue Net::ReadTimeout => err
@batch.clear
self.log.error "Timeout while waiting on response: %s" % [ err.message ]
end
Setup a persistent connection.
def setup_connection
self.endpoint.scheme = 'http'
self.endpoint.host = self.ip
self.endpoint.path = '/streamscape_api'
@connection = Net::HTTP.new( self.endpoint.host, self.endpoint.port )
self.connection.set_debug_output( $stderr ) if $DEBUG
self.connection.read_timeout = DEFAULT_TIMEOUT
self.connection.open_timeout = DEFAULT_TIMEOUT
return
end