Cruft / aging off

I’ve wiped my devices and freshly re-installed everything, and they seem to be behaving more reliably.

They’ve had many different versions of the firmware on them, and still had data in their nodeDBs from nodes they haven’t connected to in many weeks. I was also getting tired of clicking through positions/bearings for nodes that were many weeks out of date.

To do this, I wrote a somewhat hoodly python script to wipe the devices, re-install, and then restore the channel and username settings. The bash scripts included with the firmware are nice, but, they don’t do all this and sometimes I work on Windows where getting bash and python to play nice is a pain.

Given the rapid rate at which this project is evolving, it might be a good idea to have an easier way to do this. I played some with using the factory_reset option from the python API, but I don’t think it clears the nodeDB and I don’t think I had things quite right. Is it worth adding the following calls to the API:

  1. Clear entire nodeDB
  2. Clear entries in the nodeDB older than x amount of time
  3. Clear messages
  4. Reset prefs to defaults(I think this is what factory_reset does)
  5. Clear any other state?

I think it would also be good if entries in the nodeDB would age off somehow automatically.

Also, does using esptool erase_flash erase all state on the device? I would assume it does, but when I was experimenting I got myself confused about this.

2 Likes

Which script were you using?

The node information is stored on the flash which is cleared using the device-install.sh script.

I was using my own custom script that wipes everything and restores the channel settings and device name. This seemed less error prone than doing all these steps by hand for all my devices.

#!python3

import os
import sys
import logging
import time
import argparse
import multiprocessing
import base64

import serial.tools.list_ports as list_ports
import esptool
import meshtastic

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

waitForMT = 3

def wait_for_config(device, sleep=.1, maxsecs=20, attrs=('myInfo', 'nodes', 'radioConfig')):
    if hasattr(device, 'waitForConfig'):
        device.waitForConfig(sleep=sleep, maxsecs=maxsecs, attrs=attrs)
        return

    for _ in range(int(maxsecs/sleep)):
        if all(map(lambda a: getattr(device, a, None), attrs)):
            return True
        time.sleep(sleep)
    return False

def set_url(device, url):
    if hasattr(device, 'setURL'):
        device.setURL(url)
        return

    splitURL = url.split("/#")
    bytes    = base64.urlsafe_b64decode(splitURL[-1])
    device.radioConfig.channel_settings.ParseFromString(bytes)
    device.writeConfig()

def wait_for_url(device, url, sleep=.1, maxsecs=20):
    time.sleep(5)
    for _ in range(int(maxsecs/sleep)):
        if device.channelURL == url:
            return True
        time.sleep(sleep)
    return False

def set_owner(device, long_name, short_name=None):
    if hasattr(device, 'setOwner'):
        device.setOwner(long_name, short_name)
        return

    nChars = 3
    minChars = 2
    if long_name is not None:
        long_name = long_name.strip()
        if short_name is None:
            words = long_name.split()
            if len(long_name) <= nChars:
                short_name = long_name
            elif len(words) >= minChars:
                short_name = ''.join(map(lambda word: word[0], words))
            else:
                trans = str.maketrans(dict.fromkeys('aeiouAEIOU'))
                short_name = long_name[0] + long_name[1:].translate(trans)
                if len(short_name) < nChars:
                    short_name = long_name[:nChars]
    t = mesh_pb2.ToRadio()
    if long_name is not None:
        t.set_owner.long_name = long_name
    if short_name is not None:
        short_name = short_name.strip()
        if len(short_name) > nChars:
            short_name = short_name[:nChars]
        t.set_owner.short_name = short_name
    device._sendToRadio(t)

def get_device_info():
    device     = meshtastic.SerialInterface(noProto=True)
    wait_for_config(device)
    long_name  = device.getLongName()
    short_name = device.getShortName()
    url        = device.channelURL
    device.close()
    device._disconnected()
    return long_name, short_name, url

def get_device_info_process(queue):
    long_name, short_name, url = get_device_info()
    queue.put((long_name, short_name, url))

def get_device_info_multiprocess():
    ctx         = multiprocessing.get_context('spawn')
    queue       = ctx.Queue()
    meshprocess = ctx.Process(target=get_device_info_process, args=(queue,))
    meshprocess.start()
    ret         = queue.get()
    meshprocess.join()
    return ret

def flash_device(firmware, sysinfo):
    esp          = None
    initial_baud = esptool.ESPLoader.ESP_ROM_BAUD
    final_baud   = 921600
    ser_list     = sorted(ports.device for ports in list_ports.comports())

    args         = {
        'after':             'hard_reset'
        , 'addr_filename': (
            (  0x1000,  open(sysinfo,  'rb'))
            , (0x10000, open(firmware, 'rb'))
        )
        , 'baud':             921600
        , 'before':          'default_reset'
        , 'chip':            'auto'
        , 'compress':         None
        , 'encrypt':          False
        , 'erase_all':        False
        , 'flash_freq':      'keep'
        , 'flash_mode':      'keep'
        , 'flash_size':      'detect'
        , 'ignore_flash_encryption_efuse_setting': False
        , 'no_stub':          False
        , 'no_compress':      False
        , 'no_progress':      False
        , 'override_vddsdio': None
        , 'port':             None
        , 'spi_connection':   None
        , 'trace':            False
        , 'verify':           False
    }
    args = argparse.Namespace(**args)

    log.info('Searching for device')
    for each_port in reversed(ser_list):
        try:
            log.info(f'Serial port {each_port}')
            esp = esptool.ESPLoader.detect_chip(each_port, initial_baud, args.before, args.trace)
            break
        except Exception as exc:
            pass

    esptool.read_mac(esp, args)
    esp = esp.run_stub()
    esp.change_baud(final_baud)
    if hasattr(args, 'flash_size'):
        log.info('Configuring flash size...')
        esptool.detect_flash_size(esp, args)
        if args.flash_size != 'keep':  # TODO: should set this even with 'keep'
            esp.flash_set_parameters(esptool.flash_size_bytes(args.flash_size))

    log.info('Erasing flash')
    esptool.erase_flash(esp, args)

    log.info('Writing flash')
    esptool.write_flash(esp, args)

    log.info('Reseting device')
    esp.hard_reset()
    esp._port.close()


if __name__ == '__main__':
    if len(sys.argv) < 2:
        log.error(f'No firmware specified: {sys.argv}')
        sys.exit(1)

    firmware =  sys.argv[1]
    sysinfo  = (sys.argv[2] if len(sys.argv) > 2 else os.path.join(os.path.dirname(firmware), 'system-info.bin'))

    long_name, short_name, url = get_device_info()

    log.info(f'firmware:  {firmware} sysinfo: {sysinfo}')
    log.info(f'long_name: {long_name} short_name: {short_name}')
    log.info(f'url:       {url}')

    flash_device(firmware, sysinfo)

    time.sleep(waitForMT)

    device = meshtastic.SerialInterface()
    wait_for_config(device)

    log.info(f'Setting long and short name: {long_name} {short_name}')
    set_owner(device, long_name, short_name)

    log.info(f'Setting url: {url}')
    set_url(device, url)

    res = wait_for_url(device, url)
    log.info(f'{res}: {device.channelURL}')

    log.info('Closing device')
    device.close()
4 Likes

Right on! That looks rather complete!

1 Like

Very nice script to clean up node and I guess to update firmware as well. I am trying this to update my node from 1,1.23 to 1.1.32 and placed firmware-tlora-v1-EU865-1.1.32.bin in the same folder where the script resides. I ran it but got this error and the task could not complete:
site-packages\meshtastic_init_.py", line 294, in channelURL
bytes = self.radioConfig.channel_settings.SerializeToString()
AttributeError: ‘NoneType’ object has no attribute ‘channel_settings’

Probably I misundestood something, I’ll be grateful if you could help.

I have a new “better” version of this script. I had some issues with it hanging when attempting to close/destroy the meshtastic.SerialInterface() instance on Windows. I tried some tricks with running the various steps in a separate process/thread, but it didn’t seem to help. It looks like with the latest meshtastic python this is no longer an issue(at least not on my system).

It has more options, and should(not well tested) restore more custom settings if you want to fully wipe and re-install your device. On windows, I use it from a cmd prompt that has python in its path(I use an anaconda cmd window or git bash w/ my python added to its path). Something like this:

python flash_meshtastic.py firmware-xxxx.bin

If the system-info.bin and spiffs.bins are in the same dir as the firmware file it should find them. If you add the --erase option, it should fully wipe the device before installing/reinstalling. To see the builtin help:

python flash_meshtastic.py --help

#!/usr/bin/python3

import os
import logging
import time
import argparse
import codecs
from glob import glob

import esptool

import functools
import multiprocessing as mp

try:
    import meshtastic
    haveMeshtastic = True
except ModuleNotFoundError:
    haveMeshtastic = False

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

sleepSecs = 4

def from_string(valstr):
    if   valstr.lower() in ('t', 'true', 'yes', 'on'):  return True
    elif valstr.lower() in ('f', 'false', 'no', 'off'): return False

    try:               return int(valstr)
    except ValueError: pass
    try:               return float(valstr)
    except ValueError: return valstr

#mp.set_start_method('spawn')
def run_in_process(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        queue     = kwargs.pop('__queue', None )
        call_func = kwargs.pop('__call_func', (queue is not None))
        if call_func:
            result = func(*args, **kwargs)
            if queue is not None:
                queue.put(result)
                queue.close()
            return result
        else:
            #ctx = mp.get_context('spawn')
            queue   = mp.Queue()
            process = mp.Process(target=wrapper, args=args, kwargs={'__queue':queue, **kwargs})
            process.start()
            result  = queue.get()
            process.join()
            return result
    return wrapper

#@run_in_process
def get_device_info(**kwargs):
    if not haveMeshtastic:
        return {}

    try:
        with meshtastic.SerialInterface() as device:
            device.waitForConfig()

            if hasattr(device, 'myInfo'):
                log.debug(f'Device.myInfo:\n{device.myInfo}')
            else:
                log.debug(f'Device:\n{device}')

            result = {
                'url'         : getattr(device, 'channelURL', None)
                , 'long_name' : device.getLongName()
                , 'short_name': device.getShortName()
            }

            if hasattr(device, 'myInfo'):
                for attr in ('region', 'hw_model'):
                    result[attr] = getattr(device.myInfo, attr, None)

            try:
                regionVersion, regionCode = result.get('region', None).split('-')
                result.update({
                    'regionVersion' : regionVersion
                    , 'regionCode'  : regionCode
                })
            except Exception as ex:
                pass

            if hasattr(device, 'radioConfig'):
                rc = meshtastic.mesh_pb2.RadioConfig()
                rc.CopyFrom(device.radioConfig)
                result['radioConfig'] = rc.SerializeToString()

            return {k: v for k,v in result.items() if v}
    except Exception as ex:
        log.warning(f'Unable to get device info due to exception {ex}')
        return {}

#@run_in_process
def restore_device_info(deviceInfo, sleepSecs=sleepSecs, dryrun=False, maxiter=10, **kwargs):
    if not deviceInfo or not haveMeshtastic:
        return

    if dryrun:
        return

    for iiter in range(maxiter):
        try:
            if sleepSecs is not None:
                time.sleep(sleepSecs)

            with meshtastic.SerialInterface() as device:
                device.waitForConfig()

                url        = deviceInfo.get('url',        None)
                long_name  = deviceInfo.get('long_name',  None)
                short_name = deviceInfo.get('short_name', None)

                if long_name is not None or short_name is not None:
                    log.info(f'Setting long and short name: {long_name} {short_name}')
                    device.setOwner(long_name, short_name)

                if hasattr(device, 'radioConfig') and 'radioConfig' in deviceInfo:
                    rc = meshtastic.mesh_pb2.RadioConfig()
                    rc.ParseFromString(deviceInfo['radioConfig'])

                    prefs = rc.preferences
                    for attr in ('no_wifi', 'wifi', 'wifi_ssid', 'wifi_password'):
                        val = deviceInfo.get(attr, None)
                        if val is None:
                            continue
                        log.info(f'Setting {attr}: {val}')
                        setattr(prefs, attr, val)

                    for name, val in deviceInfo.get('set', ()):
                        setattr(prefs, name, from_string(val))

                    for name, val in deviceInfo.get('setstr', ()):
                        try:
                            setattr(prefs, name, val)
                        except TypeError:
                            if not isinstance(val, str):
                                raise
                            else:
                                setattr(prefs, name, codecs.decode(val, 'hex'))

                    device.writeConfig()

                if url is not None:
                    log.info(f'Setting url: {url}')
                    device.setURL(url)

            return
        except Exception as ex:
            if iiter < maxiter-1:
                log.warning(f'Received exception {ex} during attempt {iiter+1} to restore/set settings; trying again')
            else:
                log.error(f'Unable to restore/set settings due to exception {ex}')

#@run_in_process
def flash_device(flashMap, port=None, eraseFirst=False, baud=921600, dryrun=False, flashProcess=True, **kwargs):
    baseargs = []
    if port is not None:
        baseargs += ['--port', f'{port}']
    if baud is not None:
        baseargs += ['--baud', f'{baud}']

    for i, (filename, addr) in enumerate(flashMap.items()):
        args = []

        if isinstance(addr, str):
            args += baseargs + ['write_flash', f'{addr}',     filename]
        else:
            args += baseargs + ['write_flash', f'0x{addr:x}', filename]

        if i == 0 and eraseFirst:
            args += ['--erase-all']

        if dryrun:
            log.info(f'esptool args: {args}')
        elif flashProcess:
            cmd = ' '.join(['python', '-m', 'esptool'] + args)
            log.info(f'Running in separate process: {cmd}')
            os.system(cmd)
        else:
            log.info(f'Running in python {args}')
            esptool.main(args)

def findFirmware(dirname, deviceInfo={}):
    if 'hw_model' not in deviceInfo or 'regionCode' not in deviceInfo:
        log.debug('Unable to get hw_model and/or region; unable to find firmware')
        return None

    globptn = os.path.join(dirname, f'firmware-{deviceInfo["hw_model"]}-{deviceInfo["regionCode"]}-*.bin')

    log.info(f'Globbing for firmwares with pattern: {globptn}')
    candidates = glob(globptn)

    if not candidates:
        log.info(f'No candidates found')
        return None
    else:
        fw = candidates[0]
        log.debug(f'Candidates: {candidates}')
        log.info(f'Selecting {fw} from {len(candidates)} candidates')
        return fw

def main(fwDir=os.curdir):
    defSIAddr   = '0x1000'
    defFWAddr   = '0x10000'
    defSFAddr   = '0x00390000'

    parser      = argparse.ArgumentParser()

    parser.add_argument( '--port',          help=f'Set serial port' )

    parser.add_argument( '--erase',         help=f'Erase flash before writing',          default=False,  action='store_true'  )
    parser.add_argument( '--no-erase',      help=f'Don\'t erase flash before writing',   dest='erase',   action='store_false' )

    parser.add_argument( '--auto',          help=f'Automatically search for sysinfo and spiffs', default=True, action='store_true'  )
    parser.add_argument( '--no-auto',       help=f'Don\'t auto search for sysinfo and spiffs',   dest='auto',  action='store_false' )

    parser.add_argument( '--flash-process',    help=f'Flash in separate process',        default=False,        action='store_true'  )
    parser.add_argument( '--no-flash-process', help=f'Don\'t flash in separate process', dest='flash_process', action='store_false' )

    parser.add_argument( '--firmware-dir',  help=f'Specify dir to search for firmware', default=fwDir )

    parser.add_argument( '--firmware',      help=f'Specify firmware file to flash' )
    parser.add_argument( '--sysinfo',       help=f'Specify system info file to flash' )
    parser.add_argument( '--spiffs',        help=f'Specify spiffs file to flash' )

    parser.add_argument( '--firmware-addr', help=f'Specify firmware address (default:{defFWAddr}',    default=defFWAddr )
    parser.add_argument( '--sysinfo-addr',  help=f'Specify system info address (default:{defSIAddr}', default=defSIAddr )
    parser.add_argument( '--spiffs-addr',   help=f'Specify spiffs address (default:{defSFAddr}',      default=defSFAddr )

    parser.add_argument( '--debug',         help=f'Enable debug logging',                default=False, action='store_true' )

    parser.add_argument( '--dryrun',        help=f'Don\'t flash or restore',             default=False, action='store_true'  )
    parser.add_argument( '--no-dryrun',     help=f'Do flash or restore',                 dest='dryrun', action='store_false' )

    parser.add_argument( '--query',         help=f'Query device for meshtastic info',        default=True, action='store_true'  )
    parser.add_argument( '--no-query',      help=f'Don\'t query device for meshtastic info', dest='query', action='store_false' )

    if haveMeshtastic:
        parser.add_argument( '--restore',   help=f'Restore/make settings changes',       default=True,   action='store_true'  )
        parser.add_argument( '--no-restore',help=f'Don\'t restore/make settings changes',dest='restore', action='store_false' )

        parser.add_argument( '--sleep',     help='Seconds to sleep before restoring settings', default=sleepSecs )

        parser.add_argument( '--url',       help='Specify meshtastic URL' )
        parser.add_argument( '--name',      help='Specify meshtastic long name',  dest='long_name'  )
        parser.add_argument( '--shortname', help='Specify meshtastic short name', dest='short_name' )

        parser.add_argument( '--wifi',      help=f'Enable wifi AP mode',                        default=None, action='store_true'  )
        parser.add_argument( '--no-wifi',   help=f'Don\'t enable wifi AP mode', dest='no_wifi', default=None, action='store_false' )

        parser.add_argument( '--wifi-ssid',    help=f'Set wifi SSID',     dest='wifi_ssid' )
        parser.add_argument( '--wifi-password',help=f'Set wifi password', dest='wifi_password' )

        parser.add_argument( '--set',       help=f'Set numeric parameter', nargs=2, action='append' )
        parser.add_argument( '--setstr',    help=f'Set string parameter',  nargs=2, action='append' )

    args, unknown_args = parser.parse_known_args()

    for arg in unknown_args:
        if os.path.isfile(arg):
            args.firmware = arg
        elif os.path.isdir(arg):
            args.firmware_dir = arg
        else:
            log.error(f'Unknown arg: "{arg}" is not a firmware file or firmware directory!')
            return

    if args.debug:
        logging.basicConfig(level=logging.DEBUG)

    deviceInfo = get_device_info() if args.query else {}

    log.debug(f'Retrieved device info:\n{deviceInfo}')

    if args.firmware is None and args.firmware_dir is not None:
        log.info('Searching for firmware')
        args.firmware = findFirmware(os.path.abspath(args.firmware_dir), deviceInfo)

    if args.firmware is None:
        log.warning('No firmware; will not flash')
        flashMap = {}
    else:
        flashMap = { args.firmware : args.firmware_addr }

    if getattr(args, 'firmware', None) is not None:
        fwDir = os.path.dirname(os.path.abspath(args.firmware))
    elif getattr(args, 'firmware_dir', None) is not None:
        fwDir = os.path.abspath(args.firmware_dir)

    if args.auto and args.sysinfo is None:
        sysinfo = os.path.join(fwDir, 'system-info.bin')
        if os.path.exists(sysinfo):
            args.sysinfo = sysinfo
            flashMap[args.sysinfo] = args.sysinfo_addr

    if args.auto and args.spiffs is None:
        spiffs = glob(os.path.join(fwDir, 'spiffs-*.bin'))
        if spiffs:
            args.spiffs = spiffs[0]
            flashMap[args.spiffs] = args.spiffs_addr

    for attr in (
        'url'
        , 'long_name'
        , 'short_name'
        , 'wifi'
        , 'no_wifi'
        , 'wifi_ssid'
        , 'wifi_password'
        , 'set'
        , 'setstr'
    ):
        val = getattr(args, attr, None)
        if val is not None:
            deviceInfo[attr] = val

    def makeStr(s):
        if isinstance(s, str):
            return s
        else:
            return f'0x{s:x}'

    log.info(f'sysinfo:  {args.sysinfo} @ {makeStr(args.sysinfo_addr)}'  )
    log.info(f'spiffs:   {args.spiffs} @ {makeStr(args.spiffs_addr)}'    )
    log.info(f'firmware: {args.firmware} @ {makeStr(args.firmware_addr)}')

    if deviceInfo:
        log.info(f'long_name: {deviceInfo.get("long_name", None)} short_name: {deviceInfo.get("short_name", None)}')
        log.info(f'url:       {deviceInfo.get("url", None)}')

    log.info(f'Port: {args.port}')
    log.info(f'Flash erase: {args.erase}')
    log.debug(f'Flash map:\n{flashMap}')
    log.info(f'Device info:\n{deviceInfo}')

    if flashMap:
        flash_device(flashMap, port=args.port, eraseFirst=args.erase, dryrun=args.dryrun, flashProcess=args.flash_process)

    if args.restore:
        restore_device_info(deviceInfo, sleepSecs=args.sleep, dryrun=args.dryrun)

if __name__ == '__main__':
    main()
1 Like

I also tried to make it so that if you are updating a device that already has an older meshtastic firmware on it, you can just specify the directory containing the contents of the release zip file and it will automatically find the firmware that matches your current hw_model and regionCode settings. However, this feature is not well tested.

I thought a python script might be a little more cross platform than the bash scripts included in the release zip. I don’t always have a working bash shell handy when I’m on Windows. Maybe others can find this script useful?

python flash_meshtastic /path/to/firmware-1.1.42/

1 Like