Documentation and cleanup

This commit is contained in:
Geens 2024-08-03 21:30:57 +02:00
parent deed622dfe
commit 4329650116
7 changed files with 149 additions and 51 deletions

68
main.py
View File

@ -1,63 +1,51 @@
from pymodbus.client import ModbusSerialClient
import time
import traceback
import argparse
import serial.tools.list_ports
from src.hardware import Hardware
from src.logic import loop
def get_first_available_com_port():
ports = list(serial.tools.list_ports.comports())
return ports[0].device if ports else 'COM3'
from src.util import parse_commandline_args, connect_modbus_client
from src.config import MAX_RECONNECTION_ATTEMPTS, INITIAL_BACKOFF_TIME
def main():
parser = argparse.ArgumentParser(description='UNO Relay Modbus Controller')
parser.add_argument('--com_port', type=str, default=get_first_available_com_port(),
help='COM port for the Modbus connection')
parser.add_argument('--slave_id', type=int, default=1,
help='Slave ID for the Modbus device')
parser.add_argument('--cycle_time', type=float, default=0.05,
help='Cycle time in seconds for the main loop')
args = parser.parse_args()
com_port = args.com_port
slave_id = args.slave_id
cycle_time = args.cycle_time
args = parse_commandline_args()
reconnection_attempts = 0
while True:
try:
client = ModbusSerialClient(
port=com_port,
baudrate=57600,
bytesize=8,
parity='N',
stopbits=1,
timeout=1
)
client.connect()
time.sleep(3)
hardware = Hardware(client, slave_id)
client = connect_modbus_client(args)
reconnection_attempts = 0 # Reset attempts on successful connection
hardware = Hardware(client, args.slave_id)
while True:
start_time = time.time()
hardware.update()
hardware.sync_hardware_state()
loop(hardware)
execution_time = time.time() - start_time
if execution_time > cycle_time:
print(f"Warning: Loop execution time ({execution_time:.4f}s) exceeded cycle time ({cycle_time:.4f}s)")
sleep_time = max(0, cycle_time - execution_time)
if execution_time > args.cycle_time:
print(f"Warning: Loop execution time ({execution_time:.4f}s) exceeded cycle time ({args.cycle_time:.4f}s)")
sleep_time = max(0, args.cycle_time - execution_time)
time.sleep(sleep_time)
except KeyboardInterrupt:
print("Stopped by user")
break
except ConnectionError as e:
print(f"Connection error: {e}")
break # Exit if we've already tried the maximum number of reconnection attempts
except Exception as e:
print(f"Error: {e}")
print(traceback.format_exc())
time.sleep(5)
reconnection_attempts += 1
if reconnection_attempts >= MAX_RECONNECTION_ATTEMPTS:
print("Maximum reconnection attempts reached. Exiting.")
break
backoff_time = INITIAL_BACKOFF_TIME * (2 ** reconnection_attempts)
print(f"Attempting to reconnect in {backoff_time} seconds...")
time.sleep(backoff_time)
finally:
if 'client' in locals():
client.close()

View File

@ -1,3 +1,4 @@
from .hardware import Hardware
from .logic import loop
from .buttons_timers_and_relays import ButtonsTimersAndRelays
from .buttons_timers_and_relays import ButtonsTimersAndRelays
from .util import parse_commandline_args, connect_modbus_client

View File

@ -1,8 +1,14 @@
from enum import Enum
import time
from src.hardware import Hardware
class ButtonsTimersAndRelays:
"""
Manages the state machine for buttons, timers, and relays.
"""
class State(Enum):
"""Enumeration of possible states for the state machine."""
RELAY_0_LOW = 0
RELAY_0_HIGH = 1
BUTTON_1_PRESSED = 2
@ -11,11 +17,18 @@ class ButtonsTimersAndRelays:
RELAY_1_HIGH_2 = 5
def __init__(self):
"""Initialize the ButtonsTimersAndRelays object."""
self.current_state = self.State.RELAY_0_LOW
self.state_change_time = time.time()
self.time_in_current_state = 0
def loop(self, hardware):
def update_state_machine(self, hardware: Hardware) -> None:
"""
Execute one iteration of the state machine loop.
Args:
hardware (Hardware): The hardware interface object.
"""
next_state = self.current_state
# State transitions

11
src/config.py Normal file
View File

@ -0,0 +1,11 @@
# config.py
MODBUS_CONFIG = {
'baudrate': 57600,
'bytesize': 8,
'parity': 'N',
'stopbits': 1,
'timeout': 1
}
MAX_RECONNECTION_ATTEMPTS = 5
INITIAL_BACKOFF_TIME = 1 # seconds

View File

@ -1,7 +1,18 @@
from pymodbus.exceptions import ModbusException
from pymodbus.client import ModbusSerialClient
class Hardware:
def __init__(self, client, slave_id):
"""
Represents the hardware interface for the Arduino Python Modbus Example.
"""
def __init__(self, client: ModbusSerialClient, slave_id: int):
"""
Initialize the Hardware object.
Args:
client (ModbusSerialClient): The Modbus client for communication.
slave_id (int): The slave ID of the Modbus device.
"""
self.client = client
self.slave_id = slave_id
self.relay_0 = False
@ -9,7 +20,10 @@ class Hardware:
self.button_0 = False
self.button_1 = False
def update(self):
def sync_hardware_state(self) -> None:
"""
Update the hardware state by writing to coils and reading discrete inputs.
"""
try:
self.client.write_coils(0, [self.relay_0, self.relay_1], slave=self.slave_id)
input_response = self.client.read_discrete_inputs(0, 2, slave=self.slave_id)
@ -19,14 +33,18 @@ class Hardware:
except Exception as e:
print(f"Modbus error: {e}")
def set_relay_0(self, value):
def set_relay_0(self, value: bool) -> None:
"""Set the state of relay 0."""
self.relay_0 = value
def set_relay_1(self, value):
def set_relay_1(self, value: bool) -> None:
"""Set the state of relay 1."""
self.relay_1 = value
def get_button_0(self):
def get_button_0(self) -> bool:
"""Get the state of button 0."""
return self.button_0
def get_button_1(self):
def get_button_1(self) -> bool:
"""Get the state of button 1."""
return self.button_1

View File

@ -1,6 +1,13 @@
from src.buttons_timers_and_relays import ButtonsTimersAndRelays
from src.hardware import Hardware
buttons_timers_relays = ButtonsTimersAndRelays()
def loop(hardware):
buttons_timers_relays.loop(hardware)
def loop(hardware: Hardware) -> None:
"""
Main logic loop for the Arduino Python Modbus Example.
Args:
hardware (Hardware): The hardware interface object.
"""
buttons_timers_relays.update_state_machine(hardware)

60
src/util.py Normal file
View File

@ -0,0 +1,60 @@
import argparse
import time
import serial.tools.list_ports
from pymodbus.client import ModbusSerialClient
from src.config import MODBUS_CONFIG, MAX_RECONNECTION_ATTEMPTS, INITIAL_BACKOFF_TIME
def get_default_com_port() -> str:
"""
Get the first available COM port.
Returns:
str: The name of the first available COM port.
"""
ports = list(serial.tools.list_ports.comports())
return ports[0].device
def parse_commandline_args() -> argparse.Namespace:
"""
Parse command line arguments.
Returns:
argparse.Namespace: Parsed command line arguments.
"""
parser = argparse.ArgumentParser(description='Arduino Python Modbus Example')
parser.add_argument('--com_port', type=str, default=get_default_com_port(),
help='COM port for the Modbus connection')
parser.add_argument('--slave_id', type=int, default=1,
help='Slave ID for the Modbus device')
parser.add_argument('--cycle_time', type=float, default=0.05,
help='Cycle time in seconds for the main loop')
return parser.parse_args()
def connect_modbus_client(args: argparse.Namespace) -> ModbusSerialClient:
"""
Connect to the Modbus client with exponential backoff.
Args:
args (argparse.Namespace): Parsed command line arguments.
Returns:
ModbusSerialClient: Connected Modbus client.
Raises:
ConnectionError: If unable to connect after maximum attempts.
"""
client = ModbusSerialClient(
port=args.com_port,
**MODBUS_CONFIG
)
for attempt in range(MAX_RECONNECTION_ATTEMPTS):
if client.connect():
time.sleep(3)
return client
backoff_time = INITIAL_BACKOFF_TIME * (2 ** attempt)
print(f"Connection attempt {attempt + 1} failed. Retrying in {backoff_time} seconds...")
time.sleep(backoff_time)
raise ConnectionError("Failed to connect to Modbus client after maximum attempts")