Continuous message sending through terminal

This is the (hacked together) code I use to send the current time every 30 seconds over the mesh. It uses threads and allows other code to run between the send text stuff. Just replace what’s inside def sendText(): with anything you want to do.

import meshtastic
from pubsub import pub
import threading
import time
from time import sleep
import datetime

def onReceive(packet, interface): # called when a packet arrives
  print(f"Received: {packet}")

def onConnection(interface, topic=pub.AUTO_TOPIC): # called when we (re)connect to the radio
  print ("starting...")
  rt = RepeatedTimer(30, sendText) # no need of rt.start()

def sendText(): # called every x seconds
  currTime = datetime.datetime.now().strftime("%H:%M:%S")
  interface.sendText(currTime)
  print("Message sent: " + currTime)

pub.subscribe(onReceive, "meshtastic.receive")
pub.subscribe(onConnection, "meshtastic.connection.established")
# By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0
interface = meshtastic.SerialInterface()

class RepeatedTimer(object): # Timer helper class
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False
6 Likes