Source code for pygmc.devices.device_rfc1801

import datetime
import logging
import struct
from typing import Generator, Tuple

from .device import BaseDevice

logger = logging.getLogger("pygmc.devices.rfc1801")


[docs] class DeviceRFC1801(BaseDevice): def __init__(self, connection): """ Represent a GMC device. Can be used with: GMC-500, GMC-500+, GMC-600, GMC-600+ Parameters ---------- connection : pygmc.Connection An connection interface to the USB device. """ super().__init__(connection) # Overwrites from BaseConfig & adds 1801 specific items self._cfg_spec_map.update( { "Calibration_uSv_0": { "index": 10, "size": 4, "description": "", "type": ">f", }, "Calibration_uSv_1": { "index": 16, "size": 4, "description": "", "type": ">f", }, "Calibration_uSv_2": { "index": 22, "size": 4, "description": "", "type": ">f", }, "IdleTextState": { "index": 26, "size": 1, "description": "??", "type": None, }, "AlarmValue_uSv": { "index": 27, "size": 4, "description": "", "type": ">f", }, "Baudrate": { "index": 57, "size": 1, # see https://www.gqelectronicsllc.com/forum/topic.asp?TOPIC_ID=4948 reply#14 "description": "0=115200, 1=1200, 2=2400, 3=4800, 4=9600, 5=14400, 6=19200, 7=28800, 8=38400, 9=57600", "type": None, }, "Threshold_uSv": { "index": 65, "size": 4, "description": "", "type": ">f", }, } )
[docs] def get_cpm(self) -> int: """ Get CPM counts-per-minute data. Specs don't provide how CPM is computed nor if both high/low tubes are used. Returns ------- int Counts per minute (GMC has 2 tubes, assumed cpm accounts for both?) """ # A 32 bit unsigned integer is returned. In total 4 bytes data return from GQ GMC unit. # The first byte is MSB byte data and fourth byte is LSB byte data. # e.g.: 00 00 00 1C the returned CPM is 28. big-endian cmd = b"<GETCPM>>" result = self.connection.get_exact(cmd, expected=b"", size=4) count = struct.unpack(">I", result)[0] return count
[docs] def get_usv_h(self) -> float: """ Get µSv/h. Uses device calibration config. Returns ------- float µSv/h """ # lazily load config... i.e. don't load it until it's needed. if not self._config: self.get_config() # Not 100% sure on this... # µSv/h = (CPM / CalibrationCPM_1) * Calibration_uSv_1 usv_h = (self.get_cpm() / self._config["CalibrationCPM_1"]) * self._config[ "Calibration_uSv_1" ] return usv_h
[docs] def get_cps(self) -> int: """ Get CPS counts-per-second. Returns ------- int Counts per second """ cmd = b"<GETCPS>>" result = self.connection.get_exact(cmd, expected=b"", size=4) count = struct.unpack(">I", result)[0] return count
[docs] def get_max_cps(self) -> int: """ Get the maximum counts-per-second since the device POWERED ON. Returns ------- int Max counts per second observed """ cmd = b"<GETMAXCPS>>" result = self.connection.get_exact(cmd, expected=b"", size=4) count = struct.unpack(">I", result)[0] return count
[docs] def get_cpmh(self) -> int: """ Get CPM of the high dose tube. Only GMC-500+ supported (spec RFC1801) Returns ------- int Counts per minute on high dose tube (GMC has 2 tubes) """ # # A 32 bit unsigned integer is returned. In total 4 bytes data return from GQ GMC unit. # The first byte is MSB byte data and fourth byte is LSB byte data. # e.g.: 00 00 00 1C the returned CPM is 28. big-endian cmd = b"<GETCPMH>>" result = self.connection.get_exact(cmd, expected=b"", size=4) count = struct.unpack(">I", result)[0] return count
[docs] def get_cpml(self) -> int: """ Get CPM of the low dose tube. Only GMC-500+ supported (spec RFC1801) Returns ------- int Counts per minute on low dose tube (GMC has 2 tubes) """ cmd = b"<GETCPML>>" result = self.connection.get_exact(cmd, expected=b"", size=4) count = struct.unpack(">I", result)[0] return count
[docs] def get_datetime(self) -> datetime.datetime: """ Get device datetime. Returns ------- datetime.datetime Device datetime """ # Return: Seven bytes data: YY MM DD HH MM SS 0xAA cmd = b"<GETDATETIME>>" data = self.connection.get_exact(cmd, expected=b"", size=7) year = int("20{0:2d}".format(data[0])) month = int("{0:2d}".format(data[1])) day = int("{0:2d}".format(data[2])) hour = int("{0:2d}".format(data[3])) minute = int("{0:2d}".format(data[4])) second = int("{0:2d}".format(data[5])) return datetime.datetime(year, month, day, hour, minute, second)
[docs] def get_gyro(self) -> Tuple[int, int, int]: """ Get gyroscope data. No units specified in spec RFC1801 nor RFC1201 :( Returns ------- Tuple[int, int, int] (X, Y, Z) gyroscope data """ cmd = b"<GETGYRO>>" # Return: Seven bytes gyroscope data in hexdecimal: # BYTE1,BYTE2,BYTE3,BYTE4,BYTE5,BYTE6,BYTE7 # Here: BYTE1,BYTE2 are the X position data in 16 bits value. # The first byte is MSB byte data and second byte is LSB byte data. # BYTE3,BYTE4 are the Y position data in 16 bits value. # The first byte is MSB byte data and second byte is LSB byte data. # BYTE5,BYTE6 are the Z position data in 16 bits value. # The first byte is MSB byte data and second byte is LSB byte data. # BYTE7 always 0xAA result = self.connection.get_exact(cmd, expected=b"", size=7) x, y, z, dummy = struct.unpack(">hhhB", result) return x, y, z
[docs] def get_voltage(self) -> float: """ Get device voltage. Returns ------- float Device voltage in volts Notes ----- Device only has resolution to tenth of a volt despite example in spec RFC1801. """ # Device only has resolution to tenth of a volt despite example in spec RFC1801. cmd = b"<GETVOLT>>" result = self.connection.get_exact(cmd, expected=b"", size=5) # result example: b'4.8v\x00' result = float(result[0:3]) # e.g. float(b'4.8') return result
[docs] def get_config(self) -> dict: """ Get device config. Returns ------- dict """ cmd = b"<GETCFG>>" self.connection.reset_buffers() cfg_bytes = self.connection.get_exact(cmd, expected=b"", size=512) self._parse_cfg(cfg_bytes) return self._config
[docs] def power_off(self) -> None: """Power OFF device.""" cmd = b"<POWEROFF>>" self.connection.reset_buffers() self.connection.write(cmd)
[docs] def power_on(self) -> None: """Power ON device.""" cmd = b"<POWERON>>" self.connection.reset_buffers() self.connection.write(cmd)
[docs] def heartbeat_live(self, count=60) -> Generator[int, None, None]: """ Get live CPS data, as a generator. i.e. yield (return) CPS as available. Parameters ---------- count : int, optional How many CPS counts to return (default=60). Theoretically, 1 count = 1 second. Wall-clock time can be a bit higher or lower. Yields ------ int CPS - Counts-Per-Second int """ self.connection.reset_buffers() try: self._heartbeat_on() for i in range(count): raw = self.connection.read_until(expected=b"", size=4) cps = struct.unpack(">I", raw)[0] yield cps finally: self._heartbeat_off()
[docs] def heartbeat_live_print(self, count=60) -> None: """ Print live CPS data. Parameters ---------- count : int, optional How many CPS counts to return (default=60). Theoretically, 1 count = 1 second. Wall-clock time can be a bit higher or lower. """ max_ = 0 i = 0 for cps in self.heartbeat_live(count=count): i += 1 if cps > max_: max_ = cps # Yea, I'd rather use f-string... just trying to make it compatible with # older Python versions # empty leading space for terminal cursor msg = " cps={cps:<2} | max={max_:<2} | loop={i:<10,}".format( cps=cps, max_=max_, i=i ) print(msg, end="\r") # Carriage return - update line we just printed print("", end="\n") # empty print to move carriage return to next line
[docs] def send_key(self, key_number) -> None: """ Send key press signal to device. Note the power button acts as menu clicks and does not power on/off. Parameters ---------- key_number: int Each number represents a key-press. key=0 -> S1 (back button) key=1 -> S2 (down button) key=2 -> S3 (up button) key=3 -> S4 (power button) """ if key_number not in (0, 1, 2, 3): raise ValueError("key must be in (0, 1, 2, 3)") cmd = "<KEY{}>>".format(key_number).encode() self.connection.write(cmd)
[docs] def set_datetime(self, datetime_=None) -> None: """ Set datetime on device. Parameters ---------- datetime_: None | datetime.datetime Datetime to set. Default=None uses current time on computer i.e. datetime.datetime.now() Raises ------ ValueError Year value earlier than 2000. RuntimeError Unexpected response from device. """ if not datetime_: datetime_ = datetime.datetime.now() if datetime_.year < 2000: # welp... device has year hardcoded 20xx raise ValueError("Device can't set year earlier than 2000") dt_cmd = struct.pack( ">BBBBBB", datetime_.year - 2000, datetime_.month, datetime_.day, datetime_.hour, datetime_.minute, datetime_.second, ) cmd = b"<SETDATETIME" + dt_cmd + b">>" self.connection.reset_buffers() result = self.connection.get_exact(cmd, expected=b"", size=1) if not result == b"\xaa": raise RuntimeError("Unexpected response: {}".format(result))
[docs] def reboot(self) -> None: """ Reboot device. Note: Different from power off-on as it changes display to default. """ cmd = b"<REBOOT>>" self.connection.write(cmd)
# To-be-added soon(TM) # def _history(self, start_position, size): # # <SPIR[A2][A1][A0][L1][L0]>> # # A2,A1,A0 are three bytes address data, from MSB to LSB. # # The L1,L0 are the data length requested. L1 is high byte of 16 bit integer and L0 is low byte. # # \xff is end of file? # start = struct.pack(">I", start_position)[1:] # # 2**11 = 2048 # size = struct.pack(">H", size) # a = bytearray(start) # b = bytearray(size) # self.connection._write(b"<SPIR" + a + b + b">>") # for i in range(10): # time.sleep(0.1 * (1 + i)) # data = self.connection._read() # if len(data) > 0: # break # # data = self.connection.get(b'<SPIR' + a + b + b'>>') # return data # def _all_history(self): # data = b"" # read_chunk = 2**11 # 2048 # max_size = 2**20 # 1MB? # for start in range(0, max_size, read_chunk): # kb = start / 1024 # print("Loading {0:,.0f} kB".format(kb), end="\r") # # print('Reading', start, end='\r') # tmp = self._history(start, read_chunk) # if len(tmp) == 0: # print("no data") # break # data += tmp # if tmp.count(b"\xff") >= read_chunk * 0.5: # print("End of history") # break # return data