60 lines
2.0 KiB
Python

import argparse
import time
import serial.tools.list_ports
from pymodbus.client import ModbusSerialClient
from src.config import MODBUS_CONFIG, MAX_RECONNECTION_ATTEMPTS, INITIAL_BACKOFF_TIME
def get_default_com_port() -> str:
"""
Get the first available COM port.
Returns:
str: The name of the first available COM port.
"""
ports = list(serial.tools.list_ports.comports())
return ports[0].device
def parse_commandline_args() -> argparse.Namespace:
"""
Parse command line arguments.
Returns:
argparse.Namespace: Parsed command line arguments.
"""
parser = argparse.ArgumentParser(description='DN23E08 Python Modbus Example')
parser.add_argument('--com_port', type=str, default=get_default_com_port(),
help='COM port for the Modbus connection')
parser.add_argument('--slave_id', type=int, default=1,
help='Slave ID for the Modbus device')
parser.add_argument('--cycle_time', type=float, default=0.05,
help='Cycle time in seconds for the main loop')
return parser.parse_args()
def connect_modbus_client(args: argparse.Namespace) -> ModbusSerialClient:
"""
Connect to the Modbus client with exponential backoff.
Args:
args (argparse.Namespace): Parsed command line arguments.
Returns:
ModbusSerialClient: Connected Modbus client.
Raises:
ConnectionError: If unable to connect after maximum attempts.
"""
client = ModbusSerialClient(
port=args.com_port,
**MODBUS_CONFIG
)
for attempt in range(MAX_RECONNECTION_ATTEMPTS):
if client.connect():
time.sleep(3)
return client
backoff_time = INITIAL_BACKOFF_TIME * (2 ** attempt)
print(f"Connection attempt {attempt + 1} failed. Retrying in {backoff_time} seconds...")
time.sleep(backoff_time)
raise ConnectionError("Failed to connect to Modbus client after maximum attempts")