Skip to content

Commit

Permalink
mavproxy_link: make and mark as flake8-clean
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbarker committed Jun 16, 2024
1 parent 12eb912 commit 06d895e
Showing 1 changed file with 99 additions and 68 deletions.
167 changes: 99 additions & 68 deletions MAVProxy/modules/mavproxy_link.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
#!/usr/bin/env python
'''enable run-time addition and removal of master link, just like --master on the cnd line'''
''' TO USE:
link add 10.11.12.13:14550
link list
link remove 3 # to remove 3rd output

'''enable run-time addition and removal of master link, just like --master on the cnd line
AP_FLAKE8_CLEAN
'''

# TO USE:
# link add 10.11.12.13:14550
# link list
# link remove 3 # to remove 3rd output

from pymavlink import mavutil
import time, struct, math, sys, fnmatch, traceback, json, os

import fnmatch
import json
import math
import os
import struct
import sys
import time
import traceback

if sys.version_info[0] >= 3:
import io as StringIO
Expand All @@ -18,15 +30,29 @@
from MAVProxy.modules.lib import mp_util

if mp_util.has_wxpython:
from MAVProxy.modules.lib.mp_menu import *
from MAVProxy.modules.lib.mp_menu import MPMenuSubMenu
from MAVProxy.modules.lib.mp_menu import MPMenuItem
from MAVProxy.modules.lib.wx_addlink import MPMenulinkAddDialog

dataPackets = frozenset(['BAD_DATA','LOG_DATA'])
delayedPackets = frozenset([ 'MISSION_CURRENT', 'SYS_STATUS', 'VFR_HUD',
'GPS_RAW_INT', 'SCALED_PRESSURE', 'GLOBAL_POSITION_INT',
'NAV_CONTROLLER_OUTPUT' ])
activityPackets = frozenset([ 'HEARTBEAT', 'GPS_RAW_INT', 'GPS_RAW', 'GLOBAL_POSITION_INT', 'SYS_STATUS', 'HIGH_LATENCY2' ])
radioStatusPackets = frozenset([ 'RADIO', 'RADIO_STATUS'])
dataPackets = frozenset(['BAD_DATA', 'LOG_DATA'])
delayedPackets = frozenset([
'GLOBAL_POSITION_INT',
'GPS_RAW_INT',
'MISSION_CURRENT',
'NAV_CONTROLLER_OUTPUT',
'SCALED_PRESSURE',
'SYS_STATUS',
'VFR_HUD',
])
activityPackets = frozenset([
'GLOBAL_POSITION_INT',
'GPS_RAW',
'GPS_RAW_INT',
'HEARTBEAT',
'HIGH_LATENCY2',
'SYS_STATUS',
])
radioStatusPackets = frozenset(['RADIO', 'RADIO_STATUS'])

preferred_ports = [
'*FTDI*',
Expand All @@ -45,6 +71,7 @@
'*Qiotek*',
]


class LinkModule(mp_module.MPModule):

def __init__(self, mpstate):
Expand Down Expand Up @@ -86,16 +113,18 @@ def __init__(self, mpstate):

def idle_task(self):
'''called on idle'''
if mp_util.has_wxpython:
if mp_util.has_wxpython:
if self.module('console') is not None:
if not self.menu_added_console:
self.menu_added_console = True
# we don't dynamically update these yet due to a wx bug
self.menu_rm.items = [ MPMenuItem(p, p, '# link remove %s' % p) for p in self.complete_links('') ]
self.menu_rm.items = [
MPMenuItem(p, p, '# link remove %s' % p) for p in self.complete_links('')
]
self.module('console').add_menu(self.menu)
else:
self.menu_added_console = False

for m in self.mpstate.mav_master:
m.source_system = self.settings.source_system
m.mav.srcSystem = m.source_system
Expand All @@ -112,8 +141,8 @@ def idle_task(self):
with open(self.datarate_logging, 'a') as logfile:
for master in self.mpstate.mav_master:
highest_msec_key = (self.target_system, self.target_component)
linkdelay = (self.status.highest_msec.get(highest_msec_key, 0) - master.highest_msec.get(highest_msec_key, 0))*1.0e-3
logfile.write(str(time.strftime("%H:%M:%S")) + "," +
linkdelay = (self.status.highest_msec.get(highest_msec_key, 0) - master.highest_msec.get(highest_msec_key, 0))*1.0e-3 # noqa
logfile.write(str(time.strftime("%H:%M:%S")) + "," +
str(self.link_label(master)) + "," +
str(master.linknum) + "," +
str(self.status.counters['MasterIn'][master.linknum]) + "," +
Expand All @@ -124,20 +153,20 @@ def idle_task(self):
def complete_serial_ports(self, text):
'''return list of serial ports'''
ports = mavutil.auto_detect_serial(preferred_list=preferred_ports)
return [ p.device for p in ports ]
return [p.device for p in ports]

def complete_hl(self, text):
'''return list of hl options'''
return [ 'on', 'off' ]
return ['on', 'off']

def complete_dl(self, text):
'''return list of datarate_logging options'''
return [ 'on', 'off' ]
return ['on', 'off']

def complete_links(self, text):
'''return list of links'''
try:
ret = [ m.address for m in self.mpstate.mav_master ]
ret = [m.address for m in self.mpstate.mav_master]
for m in self.mpstate.mav_master:
ret.append(m.address)
if hasattr(m, 'label'):
Expand Down Expand Up @@ -192,7 +221,7 @@ def cmd_dl(self, args):
# Open a new file handle (don't append) for logging
with open(self.datarate_logging, 'w') as logfile:
logfile.write("time, linkname, linkid, packetsreceived, bytesreceived, delaysec, lostpercent\n")
elif args[0] == "off":
elif args[0] == "off":
print("Datarate Logging OFF")
self.datarate_logging = None
else:
Expand All @@ -203,7 +232,7 @@ def cmd_hl(self, args):
if len(args) < 1:
print("High latency mode is " + str(self.high_latency))
return
elif args[0] == "on":
elif args[0] == "on":
print("High latency mode ON")
self.high_latency = True
# Tell ArduPilot to start sending HIGH_LATENCY2 messages
Expand All @@ -220,7 +249,7 @@ def cmd_hl(self, args):
0, # param6
0) # param7
return
elif args[0] == "off":
elif args[0] == "off":
print("High latency mode OFF")
self.high_latency = False
self.master.mav.command_long_send(
Expand All @@ -238,12 +267,12 @@ def cmd_hl(self, args):
return
else:
print("usage: hl <on|off>")

def show_link(self):
'''show link information'''
for master in self.mpstate.mav_master:
highest_msec_key = (self.target_system, self.target_component)
linkdelay = (self.status.highest_msec.get(highest_msec_key, 0) - master.highest_msec.get(highest_msec_key, 0))*1.0e-3
linkdelay = (self.status.highest_msec.get(highest_msec_key, 0) - master.highest_msec.get(highest_msec_key, 0))*1.0e-3 # noqa
if master.linkerror:
status = "DOWN"
else:
Expand All @@ -256,20 +285,21 @@ def show_link(self):
# don't have a signing secret
sign_string = ", (no-signing-secret)"
else:
sign_string = ", unsigned %u reject %u" % (master.mav.signing.unsigned_count, master.mav.signing.reject_count)
except AttributeError as e:
sign_string = ", unsigned %u reject %u" % (master.mav.signing.unsigned_count, master.mav.signing.reject_count) # noqa
except AttributeError:
# some mav objects may not have a "signing" attribute
pass
print("link %s %s (%u packets, %u bytes, %.2fs delay, %u lost, %.1f%% loss, rate:%uB/s%s)" % (self.link_label(master),
status,
self.status.counters['MasterIn'][master.linknum],
self.status.bytecounters['MasterIn'][master.linknum].total(),
linkdelay,
master.mav_loss,
master.packet_loss(),
self.status.bytecounters['MasterIn'][master.linknum].rate(),
sign_string))

print("link %s %s (%u packets, %u bytes, %.2fs delay, %u lost, %.1f%% loss, rate:%uB/s%s)" % (
self.link_label(master),
status,
self.status.counters['MasterIn'][master.linknum],
self.status.bytecounters['MasterIn'][master.linknum].total(),
linkdelay,
master.mav_loss,
master.packet_loss(),
self.status.bytecounters['MasterIn'][master.linknum].rate(),
sign_string,
))

def reset_link_stats(self):
'''reset link statistics'''
Expand All @@ -287,7 +317,7 @@ def cmd_alllinks(self, args):
self.cmd_vehicle([str(v)])
self.mpstate.functions.process_stdin(' '.join(args), True)
self.cmd_vehicle([str(saved_target)])

def cmd_link_list(self):
'''list links'''
print("%u links" % len(self.mpstate.mav_master))
Expand Down Expand Up @@ -344,7 +374,7 @@ def link_add(self, descriptor, force_connected=False, retries=3):
baud=self.settings.baudrate,
force_connected=force_connected,
retries=retries)
except Exception as e:
except Exception:
# try the same thing but without force-connected for
# backwards-compatability
conn = mavutil.mavlink_connection(device, autoreconnect=True,
Expand Down Expand Up @@ -411,8 +441,8 @@ def find_link(self, device):
for i in range(len(self.mpstate.mav_master)):
conn = self.mpstate.mav_master[i]
if (str(i) == device or
conn.address == device or
getattr(conn, 'label', None) == device):
conn.address == device or
getattr(conn, 'label', None) == device):
return i
return None

Expand Down Expand Up @@ -497,7 +527,7 @@ def handle_msec_timestamp(self, m, master):
return
sysid = m.get_srcSystem()
compid = m.get_srcComponent()
highest_msec_key = (sysid,compid)
highest_msec_key = (sysid, compid)
highest = master.highest_msec.get(highest_msec_key, 0)
if msec + 30000 < highest:
self.say('Time has wrapped')
Expand All @@ -512,7 +542,7 @@ def handle_msec_timestamp(self, m, master):
master.highest_msec[highest_msec_key] = msec
if msec > self.status.highest_msec.get(highest_msec_key, 0):
self.status.highest_msec[highest_msec_key] = msec
if msec < self.status.highest_msec.get(highest_msec_key, 0) and len(self.mpstate.mav_master) > 1 and self.mpstate.settings.checkdelay:
if msec < self.status.highest_msec.get(highest_msec_key, 0) and len(self.mpstate.mav_master) > 1 and self.mpstate.settings.checkdelay: # noqa
master.link_delayed = True
else:
master.link_delayed = False
Expand Down Expand Up @@ -548,13 +578,12 @@ def report_altitude(self, altitude):
self.status.altitude = altitude
altitude_converted = self.height_convert_units(altitude)
if (int(self.mpstate.settings.altreadout) > 0 and
math.fabs(altitude_converted - self.last_altitude_announce) >=
int(self.settings.altreadout)):
math.fabs(altitude_converted - self.last_altitude_announce) >=
int(self.settings.altreadout)):
self.last_altitude_announce = altitude_converted
rounded_alt = int(self.settings.altreadout) * ((self.settings.altreadout/2 + int(altitude_converted)) / int(self.settings.altreadout))
rounded_alt = int(self.settings.altreadout) * ((self.settings.altreadout/2 + int(altitude_converted)) / int(self.settings.altreadout)) # noqa
self.say("height %u" % rounded_alt, priority='notification')


def emit_accumulated_statustext(self, key, id, pending):
out = pending.accumulated_statustext()
if out != self.status.last_apm_msg or time.time() > self.status.last_apm_msg_time+2:
Expand Down Expand Up @@ -622,7 +651,7 @@ def master_msg_handling(self, m, master):
if m.get_type() == "BAD_DATA":
if self.mpstate.settings.shownoise and mavutil.all_printable(m.data):
out = m.data
if type(m.data) == bytearray:
if isinstance(m.data, bytearray):
out = m.data.decode('ascii')
self.mpstate.console.write(out, bg='red')
return
Expand All @@ -635,14 +664,14 @@ def master_msg_handling(self, m, master):
# keep the pymavlink level target component aligned with the MAVProxy setting
print("change target_component %u" % self.settings.target_component)
master.target_component = self.settings.target_component

mtype = m.get_type()

if self.heartbeat_is_from_autopilot(m):
if self.settings.target_system == 0 and self.settings.target_system != m.get_srcSystem():
self.settings.target_system = m.get_srcSystem()
self.settings.target_component = m.get_srcComponent()
self.say("online system %u" % self.settings.target_system,'message')
self.say("online system %u" % self.settings.target_system, 'message')
for mav in self.mpstate.mav_master:
mav.target_system = self.settings.target_system
mav.target_component = self.settings.target_component
Expand Down Expand Up @@ -670,14 +699,15 @@ def master_msg_handling(self, m, master):
self.set_prompt(self.status.flightmode + "> ")

if master.flightmode != self.status.last_mode_announced and time.time() > self.status.last_mode_announce + 2:
self.status.last_mode_announce = time.time()
self.status.last_mode_announced = master.flightmode
self.say("Mode " + self.status.flightmode)

if m.type in [mavutil.mavlink.MAV_TYPE_FIXED_WING,
mavutil.mavlink.MAV_TYPE_VTOL_DUOROTOR,
mavutil.mavlink.MAV_TYPE_VTOL_QUADROTOR,
mavutil.mavlink.MAV_TYPE_VTOL_TILTROTOR]:
self.status.last_mode_announce = time.time()
self.status.last_mode_announced = master.flightmode
self.say("Mode " + self.status.flightmode)

if m.type in [
mavutil.mavlink.MAV_TYPE_FIXED_WING,
mavutil.mavlink.MAV_TYPE_VTOL_DUOROTOR,
mavutil.mavlink.MAV_TYPE_VTOL_QUADROTOR,
mavutil.mavlink.MAV_TYPE_VTOL_TILTROTOR]:
self.mpstate.vehicle_type = 'plane'
self.mpstate.vehicle_name = 'ArduPlane'
elif m.type in [mavutil.mavlink.MAV_TYPE_GROUND_ROVER,
Expand Down Expand Up @@ -730,7 +760,7 @@ def add_chunk(self, m): # m is a statustext message
self.chunks[chunk_seq] = m.text

if len(m.text) != 50 or mid == 0:
self.expected_count = chunk_seq + 1;
self.expected_count = chunk_seq + 1

def complete(self):
return (self.expected_count is not None and
Expand Down Expand Up @@ -857,10 +887,10 @@ def accumulated_statustext(self):
res = mavutil.mavlink.enums["MAV_MISSION_RESULT"][m.type].name
res = res[12:]
self.mpstate.console.writeln("Got MISSION_ACK: %s: %s" % (t, res))
except Exception as e:
except Exception:
self.mpstate.console.writeln("Got MAVLink msg: %s" % m)
else:
#self.mpstate.console.writeln("Got MAVLink msg: %s" % m)
# self.mpstate.console.writeln("Got MAVLink msg: %s" % m)
pass

self.check_watched_message(m, '<')
Expand Down Expand Up @@ -962,17 +992,17 @@ def master_callback(self, m, master):
target_sysid = self.target_system

# pass to modules
for (mod,pm) in self.mpstate.modules:
for (mod, pm) in self.mpstate.modules:
if not hasattr(mod, 'mavlink_packet'):
continue
# Do not send other-system-or-component heartbeat packets to non-multi-vehicle modules
if not self.message_is_from_primary_vehicle(m) and not mod.multi_vehicle and mtype == 'HEARTBEAT':
continue
# sysid 51/'3' is used by SiK radio for the injected RADIO/RADIO_STATUS mavlink frames.
# In order to be able to pass these to e.g. the graph module, which is not multi-vehicle,
# special handling is needed, so that the module gets both RADIO_STATUS and (single) target
# special handling is needed, so that the module gets both RADIO_STATUS and (single) target
# vehicle information.
if not(sysid == 51 and mtype in radioStatusPackets):
if not (sysid == 51 and mtype in radioStatusPackets):
if not mod.multi_vehicle and sysid != target_sysid:
# only pass packets not from our target to modules that
# have marked themselves as being multi-vehicle capable
Expand Down Expand Up @@ -1013,14 +1043,15 @@ def cmd_vehicle(self, args):
m.target_component = self.mpstate.settings.target_component
if 'HEARTBEAT' in m.messages:
stamp = m.messages['HEARTBEAT']._timestamp
src_system = m.messages['HEARTBEAT'].get_srcSystem()
# src_system = m.messages['HEARTBEAT'].get_srcSystem()
if stamp > best_timestamp:
best_link = i
best_timestamp = stamp
m.link_delayed = False
m.link_delayed = False
self.mpstate.settings.link = best_link + 1
print("Set vehicle %s (link %u)" % (args[0], best_link+1))


def init(mpstate):
'''initialise module'''
return LinkModule(mpstate)

0 comments on commit 06d895e

Please sign in to comment.