Source code for pygmc.connection.discovery

"""Discover GMC devices."""

import logging
import re
from collections import namedtuple

from serial import SerialException

from .connection import Connection
from .const import BAUDRATES
from .utils import get_gmc_usb_devices

logger = logging.getLogger("pygmc.discovery")


device_details = namedtuple("Device", ["port", "baudrate", "version", "serial_number"])


[docs] class Discovery: """Discover GMC Devices""" def __init__(self, port=None, baudrate=None, timeout=3): """ Discover GMC devices. Parameters ---------- port: str | None Dev device, port, com, if known. Leave None to auto-discover all ports. baudrate: int | None Device baudrate, if known. Leave None to auto-discover correct baudrate. """ self._timeout = timeout self._discovered_devices = [] self._discover_devices_flow(port=port, baudrate=baudrate) def _discover_devices_flow(self, port=None, baudrate=None) -> None: # reset discovered devices self._discovered_devices = [] if port and baudrate: # what are you doing in discovery? self._validate_device(port, baudrate) elif port and not baudrate: # likely the 2nd most used self._discover_with_known_port(port=port) elif baudrate and not port: # Edge case... self._discover_with_known_baudrate(baudrate=baudrate) else: # lemme just do everything for you... likely most common case self._discover_all() @staticmethod def _get_gmc_usb_ports() -> list: devices = get_gmc_usb_devices() ports = [x.device for x in devices] # make the list intuitive ports = sorted(ports) return ports @staticmethod def _get_device_info(conn): if conn._con.in_waiting != 0: # don't intrude logger.debug(f"Device has non-zero bytes waiting in in-buffer ({conn._con})") return None # Device likely has heartbeat on if conn._con.out_waiting != 0: # don't intrude logger.debug(f"Device has non-zero bytes waiting in out-buffer ({conn._con})") return None try: conn.reset_buffers() serial_number = conn.get_exact(b"<GETSERIAL>>", size=7).hex() version = conn.get_at_least(b"<GETVER>>", size=7, wait_sleep=0.01).decode() if serial_number and version: return {"serial_number": serial_number, "version": version} else: return None except Exception as e: # Unsure of exception types. logger.warning(f"{e}", exc_info=True) return None finally: conn.reset_buffers() def _validate_device(self, port, baudrate) -> bool: logger.debug(f"Checking port={port} baudrate={baudrate}") try: conn = Connection(port=port, baudrate=baudrate, timeout=self._timeout) except SerialException as e: # Should a convenience method log as warning/error? # If only GQ Electronics would put the version/serial in USB description... logger.warning(f"Skipping error connecting: {e}", exc_info=True) return False info = self._get_device_info(conn) conn.close_connection() if info: version = info["version"] serial_number = info["serial_number"] discovered_device = device_details( port=port, baudrate=baudrate, version=version, serial_number=serial_number ) logger.debug(f"Discovered device: {discovered_device}") self._discovered_devices.append(discovered_device) return True return False def _discover_with_known_baudrate(self, baudrate) -> None: ports = self._get_gmc_usb_ports() for port in ports: self._validate_device(port=port, baudrate=baudrate) def _discover_with_known_port(self, port) -> None: for baudrate in BAUDRATES: is_valid = self._validate_device(port=port, baudrate=baudrate) if is_valid: break def _discover_all(self) -> None: ports = self._get_gmc_usb_ports() for port in ports: # D.R.Y. (don't repeat yourself) self._discover_with_known_port(port=port)
[docs] def get_device_by_serial_number(self, serial_number) -> list: """ Get devices by matching serial number. May return more than one device if sym-links are used. Parameters ---------- serial_number: str Device serial number. Returns ------- list List of matching devices. """ # may return more than one device if user made multiple links matched_devices = [] for device in self._discovered_devices: if device.serial_number == serial_number: matched_devices.append(device) return matched_devices
[docs] def get_device_by_version(self, version) -> list: """ Get devices by matching version. Parameters ---------- version: str Device version regex. e.g. 'GMC-5' will match any device starting with GMC-5. Returns ------- list List of matching devices. """ matched_devices = [] for device in self._discovered_devices: if re.match(version, str(device.version), flags=re.IGNORECASE): matched_devices.append(device) return matched_devices
[docs] def get_all_devices(self) -> list: """ Get all discovered devices. Returns ------- list List of all discovered devices. """ return self._discovered_devices