"""
Represent a USB connection to a GMC.
This is the communication class.
"""
import inspect
import logging
import time
# pypi
import serial
from .const import BAUDRATES
logger = logging.getLogger("pygmc.connection")
[docs]
class Connection:
"""
Represent a connection to a GMC device.
A wrapper around pyserial with common operations for a GMC device.
"""
def __init__(self, port, baudrate, timeout=5, serial_connection=None):
"""
Represent a connection to a GMC device.
Parameters
----------
port: str
Dev device, port, com to connect to.
On linux, it's usually /dev/ttyUSB0 and on windows, it's usually COM3.
baudrate: int
Speed of communication over serial USB. Must be a compatible value.
timeout : int, optional
Serial connection timeout, seconds, by default 5
serial_connection: serial.Serial
An initialized Serial instance.
"""
# pyserial has a breaking change from 3.4 to 3.5
# TypeError:
# SerialBase.read_until() got an unexpected keyword argument 'expected'
# 'terminator' for serial==3.4, 'expected' for serial==3.5
# Doing ape logic below to resolve pyserial smooth brain breaking change
try:
s = inspect.signature(serial.SerialBase.read_until)
params = list(s.parameters.keys()) # e.g. ['self', 'terminator', 'size']
self._read_until_param_name = params[
1
] # 'terminator' for serial==3.4, 'expected' for serial==3.5
except Exception as e: # noqa
logger.exception("Unable to resolve read_until param name")
self._read_until_param_name = "expected" # just guess
# The connection
if serial_connection:
logger.debug("User provided serial connection.")
self._con = serial_connection
else:
if baudrate not in BAUDRATES:
logger.error(f"Input baudrate={baudrate} not in known compatible rates.")
logger.error(f"Known compatible baudrates={BAUDRATES}")
logger.error("To force baudrate, pass in your own serial_connection")
self._con = serial.Serial(port=port, baudrate=baudrate, timeout=timeout)
def __repr__(self):
"""Use pyserial __repr__"""
if self._con:
return self._con.__repr__()
return super().__repr__()
def __str__(self):
"""Use pyserial __repr__"""
# maybe a nicer str?
return str(self.__repr__())
[docs]
def get_connection_details(self) -> dict:
"""
Get connection details.
Values of None means not available or not applicable.
Returns
-------
dict
"""
deets = {
"port": None,
"baudrate": None,
"is_open": None,
"in_waiting": None,
"out_waiting": None,
"timeout": None,
}
for key in list(deets):
if hasattr(self._con, key):
deets[key] = getattr(self._con, key)
return deets
[docs]
def close_connection(self) -> None:
"""Close connection."""
if self._con is None:
pass
else:
logger.info(f"Close connection: {self._con}")
self._con.close()
[docs]
def reset_buffers(self) -> None:
"""
Reset input & output buffers on pyserial connection.
reset_input_buffer(): Clear input buffer, discarding all that is in the buffer.
reset_output_buffer(): Clear output buffer, aborting the current output and
discarding all that is in the buffer.
"""
# Clear input buffer, discarding all that is in the buffer.
logger.debug("reset_input_buffer")
self._con.reset_input_buffer()
# Clear output buffer,
# aborting the current output and discarding all that is in the buffer.
logger.debug("reset_output_buffer")
self._con.reset_output_buffer()
[docs]
def write(self, cmd: bytes, log: bool = True) -> None:
"""
Write command to device.
Parameters
----------
cmd : bytes
Write command e.g. <GETVER>>
log : bool
Default=True to log cmd at debug level. Set false when writing sensitive
information such as WiFi password.
"""
if log:
logger.debug(f"write='{cmd}'")
else:
logger.debug("writing cmd")
self._con.write(cmd)
self._con.flush()
[docs]
def read(self, wait_sleep=0.3) -> bytes:
"""
Read all available data.
Which may be incomplete. (noob/newbie method)
Parameters
----------
wait_sleep : float, optional
Time to sleep to give device time to write, by default 0.3
Returns
-------
bytes
Device response
"""
# return everything currently in device buffer
# i.e. may be incomplete so wait a bit before read
time.sleep(wait_sleep)
# in pyserial==3.5 method added .read_all()
# Read all bytes currently available in the buffer of the OS.
# BUT... not available in pyserial==3.4
# ADDITIONALLY, https://pyserial.readthedocs.io/en/latest/index.html
# says latest yet refers to 3.4
# SO... lets make this requirement 3.4 and manually implement read_all()
if hasattr(self._con, "read_all"):
logger.debug("read_all")
result = self._con.read_all()
else:
# in_waiting - Return the number of bytes currently in the input buffer.
logger.debug("read(in_waiting)")
result = self._con.read(self._con.in_waiting)
if len(result) <= 50:
logger.debug(f"response={result}")
else:
# reading history data pollutes the logs with T.M.I.
# seeing response is as easy as: logging.basicConfig(level=9)
msg = f"response-len={len(result)} (set log level=9 to log full response)"
logger.debug(msg)
logger.log(level=9, msg=f"response={result}")
return result
[docs]
def read_until(self, size=None, expected=b"") -> bytes:
r"""
Read device data until expected LF is reached or expected result size is reached.
Waits until conditions met or timeout.
Some data has '\n' which causes reading to stop.
default changed from b'\n' to b''
Parameters
----------
size : None | int, optional
Length of expected bytes, by default None
expected : bytes, optional
Expected end character, by default b''
Returns
-------
bytes
Device response
"""
logger.debug(f"read_until(size={size}, expected={expected})")
# This is to resolve pyserial breaking change. See __init__ above.
params = {self._read_until_param_name: expected, "size": size}
result = self._con.read_until(**params)
if len(result) <= 50:
logger.debug(f"response={result}")
else:
# reading history data pollutes the logs with T.M.I.
# seeing response is as easy as: logging.basicConfig(level=9)
msg = f"response-len={len(result)} (set log level=9 to log full response)"
logger.debug(msg)
logger.log(level=9, msg=f"response={result}")
return result
[docs]
def read_at_least(self, size, wait_sleep=0.05) -> bytes:
"""
Read at least <size> bytes then wait <wait_sleep> and read the buffer.
i.e. Wait as long as needed to get at-least <size> bytes then wait <wait_sleep>
seconds and read whatever else is ready in the buffer.
Parameters
----------
size: int
Minimum size expected to read or timeout.
wait_sleep: float | int
Time to wait in seconds to check if there's anything remaining in the buffer.
Notes
-----
This method resets the input & output buffers after; incase there was extra info
that would've been added to the buffers.
This is useful for ill-defined specs where there is no exact size prescribed and
not waiting enough may result in empty/partial response and waiting too long is
wasteful if the response was ready quickly.
Returns
-------
bytes
"""
logger.debug(f"read_at_least(size={size}, wait_sleep={wait_sleep})")
# read until size or timeout
min_size_result = self.read_until(size=size, expected=b"")
extra_result = self.read(wait_sleep=wait_sleep)
# add up results like str math
result = min_size_result + extra_result
if len(result) <= 50:
logger.debug(f"combined-response={result}")
else:
msg = f"combined-response-len={len(result)} "
msg += "(set log level=9 to log full response)"
logger.debug(msg)
logger.log(level=9, msg=f"combined-response={result}")
self.reset_buffers()
return result
[docs]
def get(self, cmd, wait_sleep=0.3) -> bytes:
"""
Write command to device and get response.
Only use in development/learning environment.
May give incomplete/empty response if device is busy.
Parameters
----------
cmd : bytes
Write command e.g. <GETVER>>
wait_sleep : float, optional
Time to sleep to give device time to write, by default 0.3
Returns
-------
bytes
Device response
"""
logger.debug(f"get(cmd={cmd}, wait_sleep={wait_sleep})")
self.write(cmd)
result = self.read(wait_sleep=wait_sleep)
return result
[docs]
def get_at_least(self, cmd: bytes, size: int, wait_sleep=0.05) -> bytes:
"""
Write cmd, read at least <size> bytes then wait <wait_sleep> and read the buffer.
Parameters
----------
cmd : bytes
Write command e.g. <GETVER>>
size: int
Minimum size expected to read or timeout.
wait_sleep : float, optional
Time to sleep (seconds) to give device time to write, by default 0.05
Notes
-----
The reason for this method is due to the unspecified expected length in GETVER.
We write the command then we must wait for the device to write the response.
For GETVER on GMC300S:
.get() wait=0.1s, two-hundred loops took 20.4 seconds and failed 13%
.get_at_least() wait=0.05s, two-hundred loops took 15.3 seconds and 0% failed.
Returns
-------
bytes
Device response
"""
logger.debug(f"get(cmd={cmd}, wait_sleep={wait_sleep})")
self.write(cmd)
result = self.read_at_least(size=size, wait_sleep=wait_sleep)
return result
[docs]
def get_exact(self, cmd, size=None, expected=b"") -> bytes:
"""
Write and read exact.
Write command to device, provide expected LF or size (bytes),
wait until either LF, size, or timeout is reached,
then return device response.
Parameters
----------
cmd : bytes
Write command e.g. <GETVER>>
size : int | None, optional
Expected response size, by default None
expected : bytes, optional
Expected end char, by default b''
Returns
-------
bytes
Device response
"""
logger.debug(f"get_exact(cmd={cmd}, size={size}, expected={expected})")
self.write(cmd)
result = self.read_until(expected=expected, size=size)
return result