Source code for pygmc.devices.device

import logging
import struct

from ..history import HistoryParser

logger = logging.getLogger("pygmc.device")


[docs] class BaseDevice: def __init__(self, connection): """ Represent a base GMC device. Parameters ---------- connection : pygmc.Connection An connection interface to the USB device. """ self.connection = connection self._flash_memory_size_bytes = 2**20 # 1 MiB self._flash_memory_page_size_bytes = 2**11 # 2048 B # the config under the hood, initialize empty and lazily create self._config = dict() # Best effort interpretation from: # https://www.gqelectronicsllc.com/forum/topic.asp?TOPIC_ID=4948 # self-documenting code to interpret config data # type=None means treat byte literally. e.g. b'\x00'[0] -> 0 # type is a string means struct.unpack type self._cfg_spec_map = { "Power": { "index": 0, "size": 1, "description": "0=ON, 1=OFF... Backwards for reasons beyond comprehension.", "type": None, }, "Alarm": { "index": 1, "size": 1, # Somehow this is not backwards like Power "description": "0=OFF, 1=ON", "type": None, }, "Speaker": { "index": 2, "size": 1, "description": "0=OFF, 1=ON", "type": None, }, "CalibrationCPM_0": { "index": 8, "size": 2, "description": "", "type": ">H", }, "CalibrationCPM_1": { "index": 14, "size": 2, "description": "", "type": ">H", }, "CalibrationCPM_2": { "index": 20, "size": 2, "description": "", "type": ">H", }, "SaveDataType": { "index": 32, "size": 1, "description": "History data; 0=off, 1=CPS, 2=CPM, 3=CPM(avg/hr)", "type": None, }, "MaxCPM": { "index": 49, "size": 2, "description": "MaxCPM Hi + Lo Byte", "type": ">H", }, "Baudrate": { "index": 57, "size": 1, # coded differently for 300 and 500/600 series # see https://www.gqelectronicsllc.com/forum/topic.asp?TOPIC_ID=4948 reply#12 "description": "see https://www.gqelectronicsllc.com/forum/topic.asp?TOPIC_ID=4948 reply#12", "type": None, }, "BatteryType": { "index": 56, "size": 1, "description": "0=rechargeable, 1=non-rechargeable", "type": None, }, "ThresholdMode": { "index": 64, "size": 1, "description": "0=CPM, 1=µSv/h, 2=mR/h", "type": None, }, "ThresholdCPM": { "index": 62, "size": 2, "description": "", "type": ">H", }, } # will likely save someone a lot of time # heartbeat-on keeps writing to buffer making other functionality un-parsable self._heartbeat_off() logger.debug("Initialize BaseDevice") def _heartbeat_off(self) -> None: """ Turn heartbeat OFF. Stop writing data to buffer every second. """ self.connection.write(b"<HEARTBEAT0>>") self.connection.reset_buffers() logger.debug("Heartbeat OFF") def _heartbeat_on(self) -> None: """ Turn heartbeat ON. CPS data is automatically written to the buffer every second. """ self.connection.write(b"<HEARTBEAT1>>") logger.debug("Heartbeat ON") def _read_history_position(self, start_position, chunk_size): # http://www.gqelectronicsllc.com/forum/topic.asp?TOPIC_ID=4445 # don't need spir fix because... reset read/write buffer. start_s = struct.pack(">I", start_position)[1:] size_s = struct.pack(">H", chunk_size) cmd = b"<SPIR" + start_s + size_s + b">>" data = self.connection.get_exact(cmd, size=chunk_size) # MUST reset buffers... or deal with the bug # device with bug returns chunk_size + 1 # if we read chunk_size, then there is one byte left in buffer # which will throw off all further commands self.connection.reset_buffers() return data def _parse_cfg(self, cfg_bytes: bytes) -> None: """ Parses config bytes and sets self._config. Parameters ---------- cfg_bytes: bytes Device <GETCFG>> Returns ------- None """ cfg_map = self._cfg_spec_map for name, d in cfg_map.items(): i = d["index"] e = i + d["size"] raw = cfg_bytes[i:e] if d["type"] and d["type"] != "tbd": value = struct.unpack(d["type"], raw)[0] elif d["type"] is None: value = raw[0] else: logger.warning(f"config={name} not understood") value = None self._config[name] = value
[docs] def get_raw_history(self): """ Get device history data. Stops reading when read entire page contains empty data. Full 1 MiB read takes ~5 minutes on the slower 57,600 baudrate Returns ------- bytes Raw history data. """ i = 0 hist = b"" for start_position in range( 0, self._flash_memory_size_bytes, self._flash_memory_page_size_bytes ): data = self._read_history_position( start_position, self._flash_memory_page_size_bytes ) if data.count(b"\xff") == self._flash_memory_page_size_bytes: logger.debug("Entire read block '\\xff' stop reading history") break hist += data i += 1 logger.debug("Read history page {} done".format(i)) return hist
[docs] def save_history(self, file_path): """ Download device memory history and save to file. Parameters ---------- file_path: str Path to save. """ data = self.get_raw_history() with open(file_path, "wb") as f: f.write(data)
[docs] def get_history_data(self): """ Get tidy device memory history in a list of tuples. First row is column names. Columns: "datetime", "count", "unit", "mode", "reference_datetime", "notes" Returns ------- list: List of tuples, first row is column names. """ data = self.get_raw_history() h = HistoryParser(data=data) data = [h.get_columns()] data.extend(h.get_data()) return data
[docs] def get_version(self) -> str: """ Get version of device. Has a sleep wait to read as spec RFC1801 doesn't specify end char nor byte size. i.e. SLOW. Returns ------- str Device version """ cmd = b"<GETVER>>" self.connection.reset_buffers() result = self.connection.get_at_least(cmd=cmd, size=7, wait_sleep=0.05) return result.decode("utf8")
[docs] def get_serial(self) -> str: """Get serial.""" cmd = b"<GETSERIAL>>" self.connection.reset_buffers() result = self.connection.get_exact(cmd, size=7) return result.hex()
[docs] def get_connection_details(self): """ Get connection details from pyserial. Returns ------- dict """ return self.connection.get_connection_details()