MeshNode Usage Guide¶
The MeshNode
class is the primary interface for interacting with a mesh network in pyMC_Core. This guide provides examples and explanations of how to use the MeshNode class for various mesh operations.
Prerequisites¶
Important: Before running examples, ensure you have pyMC_Core installed. On modern Python installations (Ubuntu 22.04+, Debian 12+), you may encounter
externally-managed-environment
errors when installing packages system-wide. Create a virtual environment first:
Available Examples¶
This guide references working examples from the examples/
directory:
send_flood_advert.py
: Broadcast advertisement packets to the entire mesh networksend_direct_advert.py
: Send advertisement packets to specific contactssend_tracked_advert.py
: Send advertisements with location tracking informationsend_text_message.py
: Send encrypted text messages with CRC validationsend_channel_message.py
: Send messages to group channelsping_repeater_trace.py
: Network diagnostics using trace packets
All examples use the common.py
utilities for shared setup and support both Waveshare HAT and uConsole radio configurations.
Running Examples¶
# Run examples directly with default Waveshare radio
python examples/send_flood_advert.py
python examples/send_direct_advert.py
python examples/send_text_message.py
# Run examples with uConsole radio
python examples/send_flood_advert.py uconsole
python examples/send_direct_advert.py uconsole
python examples/send_text_message.py uconsole
Radio Setup¶
pyMC_Core supports multiple LoRa radio hardware configurations. Below are the setup instructions for supported devices.
Waveshare LoRaWAN/GNSS HAT¶
The Waveshare HAT is a popular SX1262-based LoRa module for Raspberry Pi with official documentation available at Waveshare Wiki.
Hardware Setup:
1. Connect the HAT to Raspberry Pi 40PIN GPIO header
2. Ensure SPI interface is enabled in Raspberry Pi configuration
3. Install required GPIO library: sudo apt install python3-rpi.lgpio
Pin Configuration (Raspberry Pi): - SPI Bus: SPI0 (MOSI, MISO, SCLK pins) - CS: GPIO 21 - Reset: GPIO 18 - Busy: GPIO 20 - IRQ (DIO1): GPIO 16 - TXEN: GPIO 6 - RXEN: Connected to DIO2 (not used directly)
from pymc_core.hardware.sx1262_wrapper import SX1262Radio
# Waveshare HAT configuration (matches official pinout)
radio = SX1262Radio(
bus_id=0, # SPI bus 0
cs_id=0, # SPI chip select 0
cs_pin=21, # CS pin (GPIO 21)
reset_pin=18, # Reset pin (GPIO 18)
busy_pin=20, # Busy pin (GPIO 20)
irq_pin=16, # Interrupt pin (GPIO 16)
txen_pin=6, # TX enable pin (GPIO 6)
rxen_pin=-1, # RX enable (-1, connected to DIO2)
frequency=869525000, # 869.525 MHz (EU standard)
tx_power=22, # 22 dBm
spreading_factor=11, # Spreading factor
bandwidth=250000, # 250 kHz
coding_rate=5, # 4/5 coding rate
preamble_length=17 # Preamble length
)
# Initialize the radio
radio.begin()
Hacker Gadgets uConsole RTL-SDR/LoRa/GPS/RTC/USB Hub All-In-One Extension Board¶
The all-in-one extension board with SX1262 LoRa support.
Prerequisites:
- Enable SPI1 in /boot/firmware/config.txt
:
from pymc_core.hardware.sx1262_wrapper import SX1262Radio
# uConsole configuration
radio = SX1262Radio(
bus_id=1, # SPI bus 1
cs_id=0, # SPI chip select 0
cs_pin=-1, # Hardware CS
reset_pin=25, # Reset pin (GPIO 25)
busy_pin=24, # Busy pin (GPIO 24)
irq_pin=26, # Interrupt pin (GPIO 26)
txen_pin=-1, # TX enable (-1 if not used)
rxen_pin=-1, # RX enable (-1 if not used)
frequency=915000000, # 915 MHz (US standard, adjust for your region)
tx_power=22, # 22 dBm
spreading_factor=11, # Spreading factor
bandwidth=250000, # 250 kHz
coding_rate=5, # 4/5 coding rate
preamble_length=17 # Preamble length
)
# Initialize the radio
radio.begin()
Alternative Radio Configuration¶
For custom hardware setups, you can customize the pin configuration:
# Custom pin configuration
radio = SX1262Radio(
cs_pin=21, # Your CS pin
reset_pin=18, # Your reset pin
busy_pin=20, # Your busy pin
irq_pin=16, # Your interrupt pin
txen_pin=6, # TX enable pin (if used)
rxen_pin=-1, # RX enable pin (-1 if not used)
frequency=868000000, # 868 MHz
tx_power=20 # 20 dBm
)
radio.begin()
Radio Configuration Parameters¶
Parameter | Description | Waveshare HAT | uConsole |
---|---|---|---|
bus_id |
SPI bus ID | 0 | 1 |
cs_id |
SPI chip select ID | 0 | 0 |
cs_pin |
Chip select GPIO pin | 21 | -1 |
reset_pin |
Reset GPIO pin | 18 | 25 |
busy_pin |
Busy GPIO pin | 20 | 24 |
irq_pin |
Interrupt GPIO pin | 16 | 26 |
txen_pin |
TX enable GPIO pin | 6 | -1 |
rxen_pin |
RX enable GPIO pin | -1 | -1 |
frequency |
Operating frequency in Hz | 869525000 (EU) | 915000000 (US) |
tx_power |
Transmit power in dBm | 22 | 22 |
spreading_factor |
LoRa spreading factor (7-12) | 11 | 11 |
bandwidth |
Bandwidth in Hz | 250000 | 250000 |
coding_rate |
Coding rate (5=4/5, 6=4/6, etc.) | 5 | 5 |
preamble_length |
Preamble length | 17 | 17 |
Note: Adjust the frequency
parameter based on your regional LoRa regulations (868MHz for EU, 915MHz for US, 433MHz for Asia).
Creating a MeshNode Instance¶
import asyncio
from pymc_core.node.node import MeshNode
from pymc_core.protocol.identity import LocalIdentity
async def create_mesh_node():
# Create local identity (generates new keys if none provided)
identity = LocalIdentity()
# Create mesh node with radio interface
node = MeshNode(
radio=radio, # Your initialized radio instance
local_identity=identity,
config={
"node": {
"name": "MyMeshNode"
}
}
)
# Start the node
await node.start()
return node
# Usage
node = asyncio.run(create_mesh_node())
Sending Messages¶
Direct Text Messages¶
See the working example in examples/send_text_message.py
:
# Key excerpt from send_text_message.py
packet, crc = PacketBuilder.create_text_message(
contact=mock_contact,
local_identity=identity,
message="Hello from PyMC Core! This is a test message",
attempt=0,
message_type="flood"
)
Flood Routing Messages¶
See the working example in examples/send_flood_advert.py
:
# Key excerpt from send_flood_advert.py
advert_packet = PacketBuilder.create_flood_advert(
local_identity=identity,
name="MyNode",
lat=37.7749, # San Francisco latitude
lon=-122.4194, # San Francisco longitude
flags=ADVERT_FLAG_IS_CHAT_NODE
)
Group Communication¶
Channel Messages¶
See the working example in examples/send_channel_message.py
for sending messages to group channels.
Direct Advertisements¶
See the working example in examples/send_direct_advert.py
for sending advertisements to specific contacts.
Telemetry Operations¶
Telemetry functionality is available in the MeshNode API but examples are not yet available in the examples folder. Refer to the API documentation for telemetry methods like send_telemetry_request()
.
Repeater Management¶
Connecting to a Repeater¶
async def connect_to_repeater():
result = await node.send_login(
repeater_name="repeater_01",
password="secure_password"
)
if result["success"]:
print("Successfully logged into repeater")
if result.get("is_admin"):
print("Admin privileges granted")
else:
print(f"Login failed: {result.get('error')}")
Sending Commands to Repeater¶
async def send_repeater_command():
result = await node.send_repeater_command(
repeater_name="repeater_01",
command="status",
parameters=None
)
if result["success"]:
print(f"Repeater response: {result['response']}")
else:
print(f"Command failed: {result.get('error')}")
Protocol Requests¶
async def send_protocol_request():
# Send custom protocol command
result = await node.send_protocol_request(
repeater_name="repeater_01",
protocol_code=0x10, # Custom command code
data=b"configuration_data"
)
if result["success"]:
print(f"Protocol response: {result['response']}")
else:
print(f"Protocol request failed: {result.get('error')}")
Disconnecting from Repeater¶
async def disconnect_from_repeater():
result = await node.send_logout(repeater_name="repeater_01")
if result["success"]:
print("Successfully logged out from repeater")
else:
print(f"Logout failed: {result.get('error')}")
Network Diagnostics¶
Sending Trace Packets¶
See the working example in examples/ping_repeater_trace.py
for sending trace packets to test network connectivity and repeater diagnostics.
# Key excerpt from ping_repeater_trace.py
trace_packet = PacketBuilder.create_trace_packet(
local_identity=identity,
contact=contact,
tag=0x12345678,
auth_code=0xABCD,
flags=0x01
)
Event Handling¶
Setting Up Event Service¶
from pymc_core.node.events import EventService
# Create event service
event_service = EventService()
# Attach to node
node.set_event_service(event_service)
# Listen for mesh events
async def handle_mesh_events():
async for event in event_service.listen():
if event.type == "message_received":
print(f"New message from {event.data.get('sender')}")
elif event.type == "node_discovered":
print(f"New node discovered: {event.data.get('node_id')}")
elif event.type == "telemetry_received":
print(f"Telemetry data received: {event.data}")
# Start event handler
asyncio.create_task(handle_mesh_events())
Configuration¶
Note: Radio configuration (frequency, power, etc.) is handled when creating the SX1262Radio
instance, not in the MeshNode config.
Node Configuration Options¶
config = {
"node": {
"name": "MyMeshNode",
"description": "Primary mesh network node"
},
"network": {
"max_hops": 5,
"timeout": 10.0,
"retry_count": 3
}
}
node = MeshNode(
radio=radio, # Radio configured separately
local_identity=identity,
config=config
)
Advanced Configuration¶
# Custom logging
import logging
custom_logger = logging.getLogger("MyMeshNode")
custom_logger.setLevel(logging.DEBUG)
node = MeshNode(
radio=radio,
local_identity=identity,
logger=custom_logger,
config=config
)
Error Handling¶
Error Handling¶
async def robust_send_message(contact_name: str, message: str):
try:
result = await node.send_text(contact_name, message)
if result["success"]:
print(f"Message sent successfully in {result.get('rtt_ms', 0)}ms")
return True
else:
error_msg = result.get("error", "Unknown error")
print(f"Send failed: {error_msg}")
return False
except RuntimeError as e:
print(f"Contact not found: {e}")
return False
except asyncio.TimeoutError:
print("Message timed out")
return False
except Exception as e:
print(f"Unexpected error: {e}")
return False
This guide covers the most common use cases for the MeshNode class. For detailed API documentation, refer to the API Reference.