import inspect
import logging
import time
# pypi
import serial
from serial.tools import list_ports as serial_list_ports
logger = logging.getLogger("pygmc.connection")
[docs]
class Connection:
"""
Connection to a GMC device.
Either user provided parameters or a best-guess auto-connect.
Effectively a wrapper for pyserial for GMC specific tasks.
"""
def __init__(self, timeout=5):
"""
Connection to a GMC device.
Parameters
----------
timeout : int, optional
serial connection timeout, seconds, by default 5
"""
# on windows it's usually COM3
# on linux it's usually /dev/ttyUSB0
# http://www.gqelectronicsllc.com/downloads/ to look for updates? AIR-760 has no protocol docs :(
# baudrates from GQ-RFC1201 & GQ-RFC1801
# http://www.gqelectronicsllc.com/download/GQ-RFC1201.txt
# http://www.gqelectronicsllc.com/download/GQ-RFC1801.txt
self._baudrates = [
115200,
57600,
38400,
28800,
19200,
14400,
9600,
4800,
2400,
1200,
]
logger.debug(f"Connection timeout={timeout}")
self._timeout = timeout # seconds
self._baudrate = 115200 # default
self._con = None
# pyserial has a breaking change from 3.4 to 3.5
# TypeError: SerialBase.read_until() got an unexpected keyword argument 'expected'
# 'terminator' for serial==3.4, 'expected' for serial==3.5
# Doing ape logic below to resolve pyserial smooth brain breaking change
try:
s = inspect.signature(serial.SerialBase.read_until)
params = list(s.parameters.keys()) # e.g. ['self', 'terminator', 'size']
self._read_until_param_name = params[
1
] # 'terminator' for serial==3.4, 'expected' for serial==3.5
except Exception as e: # noqa
logger.exception("Unable to resolve read_until param name")
self._read_until_param_name = "expected" # just guess
def _test_con(self) -> bool:
"""
Test connection cavemen style... Write cmd and check if there was a response.
Not sure at all if this is a good test.
No prescribed method of confirming a GMC device in specs :(
Would've liked to use <GETVER>> as test but...
spec GQ-RFC1201 says return is 14 bytes.
spec GQ-RFC1801 doesn't specify.
Picking <GETSERIAL>> as it's specified in both specs; 7 bytes
Returns
-------
bool
True: validated connection. False: unexpected response.
"""
self.reset_buffers()
try:
serial_number = self.get_exact(b"<GETSERIAL>>", size=7)
except Exception as e:
# Unsure of exception types.
logger.warning(f"{e}", exc_info=True)
return False
# timeout error if wrong
if len(serial_number) == 7:
# Not 100% sure... no prescribed method of confirming
# we're connected to a GMC device in specs
logger.debug(f"Test connection serial: {serial_number}")
return True
else:
logger.warning(f"Unexpected response: {serial_number}")
return False
def _check_baudrate(self, con):
# perhaps always turn off heartbeat when connecting because that messes with the output buffer
con.reset_input_buffer()
con.reset_output_buffer()
cmd = b"<GETSERIAL>>"
con.write(cmd)
con.flush()
# This is to resolve pyserial breaking change. See __init__ above.
params = {self._read_until_param_name: b"", "size": 7}
result = con.read_until(**params)
if len(result) == 7:
logger.debug("Baudrate successfully wrote and read data.")
return True
logger.debug(f"Baudrate check returned unexpected result: {result}")
return False
def _find_correct_baudrate(self, port: str) -> bool:
"""
Given a successful port, attempt/confirm a baudrate works.
Parameters
----------
port : str
Device port
Returns
-------
bool
True: successful connection
False: some error
"""
for br in self._baudrates:
logger.debug(f"Checking baudrate={br} for port={port}")
try:
# A successful connection doesn't mean the baudrate can read/write.
con = serial.Serial(port, baudrate=br, timeout=1)
if self._check_baudrate(con):
con.close()
self._baudrate = br
logger.debug(f"Baudrate={br} wrote and read data.")
return True
con.close()
except (OSError, serial.SerialException) as e:
# SerialException – In case the device can not be found or can not be configured.
logger.warning(f"{e}", exc_info=True)
return False
@staticmethod
def _get_available_usb_devices(regexp=None, include_links=True) -> list:
"""
Get all available USB devices.
Parameters
----------
regexp : None | str, optional
Search for ports using a regular expression. Port name, description and
hardware ID are searched.
hardwareID example ('USB VID:PID=1A86:7523 LOCATION=2-1')
Default=None, find all.
include_links : bool, optional
include symlinks under /dev when they point to a serial port, by default True
Returns
-------
list
available ports, type [serial.tools.list_ports_linux.SysFS]
"""
logger.debug(
f"_get_available_usb_devices(regexp={regexp}, include_links={include_links})"
)
if not regexp:
_ports = serial_list_ports.comports(include_links=include_links)
else:
# cast as list because it's a generator and I want an easy return type
# How many USB devices could a user possibly have?
_ports = list(
serial_list_ports.grep(regexp=regexp, include_links=include_links)
)
logger.debug(f"All ports found: {[(x.device, x.hwid) for x in _ports]}")
ports = []
for port in _ports:
hwid = port.hwid
# Filter out non-usb ports
if hasattr(hwid, "startswith") and hwid.startswith("USB"):
# e.g. hwid='USB VID:PID=1A86:7523 LOCATION=2-1'
ports.append(port)
logger.debug(
f"USB ports/dev-devices found: {[(x.device, x.hwid) for x in ports]}"
)
return ports
[docs]
def connect(
self,
port=None,
baudrate=None,
vid=None,
pid=None,
description=None,
hardware_id=None,
) -> None:
"""
Connect to device.
If all parameters are None, _auto_connect() flow is used which attempts to connect
to all available ports.
If ANY parameter is given; it's used to refine the search, any matches are considered.
Parameters are used as an OR search.
Parameters
----------
port : str | None, optional
Exact port (device dev path / com port) e.g. '/dev/ttyUSB0'
If port is specified, the following kwargs are ignored: vid, pid, description,
hardware_id.
baudrate: int | None
Device baudrate. Leave None to auto-detect baudrate. Only applicable when port
is specified.
vid : str | None, optional
Device vendor ID as hex, by default None
pid : str | None, optional
Device product ID as hex, by default None
description : str | None, optional
Device description, by default None
hardware_id : str | None, optional
Device hwid, by default None
e.g. 'USB VID:PID=1A86:7523 LOCATION=2-1'
Use hex for vid:pid input
Raises
------
ConnectionError
_description_
"""
if port and baudrate:
self.connect_exact(port, baudrate)
elif port:
works = self._find_correct_baudrate(port=port)
if works:
self._con = serial.Serial(
port=port, baudrate=self._baudrate, timeout=self._timeout
)
logger.info(f"Connected to {self._con.port}")
else:
raise ConnectionError(f"Unable to connect to: {port}")
else:
# ANY match, first match, becomes the device
inputs = [vid, pid, description, hardware_id]
if not any(v is not None for v in inputs):
# no user info to go on... let's see what we can do...
ports = self._get_available_usb_devices()
else:
regexp = "|".join([x for x in inputs if x])
logger.debug(f"serial.tools.list_ports.grep({regexp})")
ports = self._get_available_usb_devices(regexp=regexp)
works = False
for avail_port in ports:
port = avail_port.device # e.g. /dev/ttyUSBO
logger.debug(port)
works = self._find_correct_baudrate(port=port)
if works:
self._con = serial.Serial(
port=port, baudrate=self._baudrate, timeout=self._timeout
)
logger.info(f"Connected to {self._con.port}")
break
if not works:
raise ConnectionError()
logger.info(f"Connected: {self._con}")
[docs]
def connect_exact(self, port, baudrate) -> None:
"""
Connect with exact user provided parameters.
No searching port, no searching baudrate. i.e. fast.
Parameters
----------
port : str
Port. e.g. linux /dev/ttyUSB0 or windows COM3
baudrate : int
Baudrate e.g. 115200
"""
logger.debug(f"Exact connect attempt: port={port} baudrate={baudrate}")
logger.log(level=9, msg="User knows their #2") # level lower than DEBUG=10
self._con = serial.Serial(port=port, baudrate=baudrate, timeout=self._timeout)
logger.info(f"Connected: {self._con}")
[docs]
def connect_user_provided(self, connection) -> None:
"""
User does their own thing and gives a serial.Serial like class.
Parameters
----------
connection : serial.Serial
A serial.Serial like class (pyserial)
"""
# instance of serial.Serial
logger.log(level=9, msg="User knows their #2^2") # level lower than DEBUG=10
logger.info(f"User provided connection: {connection}")
self._con = connection # good luck
logger.info(f"Connected: {self._con}")
[docs]
def close_connection(self) -> None:
"""
Close connection.
"""
if self._con is None:
pass
else:
logger.info(f"Close connection: {self._con}")
self._con.close()
[docs]
def reset_buffers(self) -> None:
"""
Reset input & output buffers on pyserial connection.
reset_input_buffer(): Clear input buffer, discarding all that is in the buffer.
reset_output_buffer(): Clear output buffer, aborting the current output and discarding all that is in the buffer.
"""
# Clear input buffer, discarding all that is in the buffer.
logger.debug("reset_input_buffer")
self._con.reset_input_buffer()
# Clear output buffer, aborting the current output and discarding all that is in the buffer.
logger.debug("reset_output_buffer")
self._con.reset_output_buffer()
[docs]
def write(self, cmd: bytes) -> None:
"""
Write command to device.
Parameters
----------
cmd : bytes
Write command e.g. <GETVER>>
"""
logger.debug(f"write='{cmd}'")
self._con.write(cmd)
self._con.flush()
[docs]
def read(self, wait_sleep=0.3) -> bytes:
"""
Read all available data... which may be incomplete. (noob/newbie method)
Parameters
----------
wait_sleep : float, optional
Time to sleep to give device time to write, by default 0.3
Returns
-------
bytes
Device response
"""
# return everything currently in device buffer i.e. may be incomplete so wait a bit before read
time.sleep(wait_sleep)
# in pyserial==3.5 method added .read_all()
# Read all bytes currently available in the buffer of the OS.
# BUT... not available in pyserial==3.4
# ADDITIONALLY, https://pyserial.readthedocs.io/en/latest/index.html says latest yet refers to 3.4
# SO... lets make this requirement 3.4 and manually implement read_all()
if hasattr(self._con, "read_all"):
logger.debug("read_all")
result = self._con.read_all()
else:
# in_waiting - Return the number of bytes currently in the input buffer.
logger.debug("read(in_waiting)")
result = self._con.read(self._con.in_waiting)
logger.debug(f"response={result}")
return result
[docs]
def read_until(self, expected=b"", size=None) -> bytes:
"""
Read device data until expected LF is reached or expected result size is reached.
Waits until conditions met or timeout.
Some data has \n which causes reading to stop. default changed from b'\n' to b''
Parameters
----------
expected : bytes, optional
Expected end character, by default b''
size : None | int, optional
Length of expected bytes, by default None
Returns
-------
bytes
Device response
"""
logger.debug(f"read_until(expected={expected}, size={size})")
# This is to resolve pyserial breaking change. See __init__ above.
params = {self._read_until_param_name: expected, "size": size}
result = self._con.read_until(**params)
logger.debug(f"response={result}")
return result
[docs]
def get(self, cmd, wait_sleep=0.3) -> bytes:
"""
Write command to device and get response.
Only use in development/learning environment.
May give incomplete/empty response if device is busy.
Parameters
----------
cmd : bytes
Write command e.g. <GETVER>>
wait_sleep : float, optional
Time to sleep to give device time to write, by default 0.3
Returns
-------
bytes
Device response
"""
logger.debug(f"get(cmd={cmd}, wait_sleep={wait_sleep})")
self.write(cmd)
result = self.read(wait_sleep=wait_sleep)
logger.debug(f"response={result}")
return result
[docs]
def get_exact(self, cmd, expected=b"", size=None) -> bytes:
"""
Write command to device, provide expected LF or size (bytes),
wait until either LF, size, or timeout is reached,
then return device response.
Parameters
----------
cmd : bytes
Write command e.g. <GETVER>>
expected : bytes, optional
Expected end char, by default b''
size : int | None, optional
Expected response size, by default None
Returns
-------
bytes
Device response
"""
logger.debug(f"get_exact(cmd={cmd}, expected={expected}, size={size})")
self.write(cmd)
result = self.read_until(expected=expected, size=size)
return result