Source code for pygmc.devices.device_rfc1201

import datetime
import logging
import struct
from typing import Tuple

from .device import BaseDevice

logger = logging.getLogger("pygmc.devices.rfc1201")


[docs] class DeviceRFC1201(BaseDevice): def __init__(self, connection): """ A GMC device. Can be used with: GMC-280, GMC-300, GMC-320 Parameters ---------- connection : pygmc.Connection An connection interface to the USB device. """ super().__init__(connection) # Overwrites from BaseConfig & adds 1801 specific items self._cfg_spec_map.update( { "Calibration_uSv_0": { "index": 10, "size": 4, "description": "", "type": "<f", }, "Calibration_uSv_1": { "index": 16, "size": 4, "description": "", "type": "<f", }, "Calibration_uSv_2": { "index": 22, "size": 4, "description": "", "type": "<f", }, "IdleTextState": { "index": 26, "size": 1, "description": "??", "type": None, }, "AlarmValue_uSv": { "index": 27, "size": 4, "description": "", "type": "<f", }, "Baudrate": { "index": 57, "size": 1, # see https://www.gqelectronicsllc.com/forum/topic.asp?TOPIC_ID=4948 reply#12 "description": "64=1200,160=2400,208=4800,232=9600,240=14400,244=19200,248=28800,250=38400,252=57600,254=115200", "type": None, }, "Threshold_uSv": { "index": 65, "size": 4, "description": "", "type": "<f", }, } )
[docs] def get_cpm(self) -> int: """ Get CPM counts-per-minute data Returns ------- int Counts per minute """ # A 16 bit unsigned integer is returned. # In total 2 bytes data return from GQ GMC unit. # The first byte is MSB byte data and second byte is LSB byte data. # e.g.: 00 1C the returned CPM is 28. cmd = b"<GETCPM>>" result = self.connection.get_exact(cmd, expected=b"", size=2) count = struct.unpack(">H", result)[0] return count
[docs] def get_usv_h(self) -> float: """ Get µSv/h Uses device calibration config. Returns ------- float µSv/h """ # lazily load config... i.e. don't load it until it's needed. if not self._config: self.get_config() # Not 100% sure on this... # µSv/h = (CPM / CalibrationCPM_1) * Calibration_uSv_1 usv_h = (self.get_cpm() / self._config["CalibrationCPM_1"]) * self._config[ "Calibration_uSv_1" ] return usv_h
[docs] def get_gyro(self) -> Tuple[int, int, int]: """ Get gyroscope data. No units specified in spec RFC1801 nor RFC1201 :( Returns ------- Tuple[int, int, int] (X, Y, Z) gyroscope data """ cmd = b"<GETGYRO>>" # Return: Seven bytes gyroscope data in hexdecimal: BYTE1,BYTE2,BYTE3,BYTE4,BYTE5,BYTE6,BYTE7 # Here: BYTE1,BYTE2 are the X position data in 16 bits value. The first byte is MSB byte data and second byte is LSB byte data. # BYTE3,BYTE4 are the Y position data in 16 bits value. The first byte is MSB byte data and second byte is LSB byte data. # BYTE5,BYTE6 are the Z position data in 16 bits value. The first byte is MSB byte data and second byte is LSB byte data. # BYTE7 always 0xAA result = self.connection.get_exact(cmd, expected=b"", size=7) x, y, z, dummy = struct.unpack(">hhhB", result) return x, y, z
[docs] def get_voltage(self) -> float: """ Get device voltage Returns ------- float Device voltage in volts """ cmd = b"<GETVOLT>>" result = self.connection.get_exact(cmd, expected=b"", size=1) # result example: b'*'.hex() -> '2a' -> int('2a', 16) -> 42 -> 4.2V result = int(result.hex(), 16) / 10 return result
[docs] def get_datetime(self) -> datetime.datetime: """ Get device datetime Returns ------- datetime.datetime Device datetime """ # Return: Seven bytes data: YY MM DD HH MM SS 0xAA cmd = b"<GETDATETIME>>" data = self.connection.get_exact(cmd, expected=b"", size=7) year = int("20{0:2d}".format(data[0])) month = int("{0:2d}".format(data[1])) day = int("{0:2d}".format(data[2])) hour = int("{0:2d}".format(data[3])) minute = int("{0:2d}".format(data[4])) second = int("{0:2d}".format(data[5])) return datetime.datetime(year, month, day, hour, minute, second)
[docs] def get_config(self) -> dict: """ Get device config Returns ------- dict """ cmd = b"<GETCFG>>" self.connection.reset_buffers() cfg_bytes = self.connection.get_exact(cmd, expected=b"", size=256) self._parse_cfg(cfg_bytes) return self._config
[docs] def heartbeat_on(self) -> None: """ Turn heartbeat ON. CPS data is automatically written to the buffer every second. """ self.connection.write(b"<HEARTBEAT1>>") logger.debug("Heartbeat ON")
[docs] def heartbeat_off(self) -> None: """ Turn heartbeat OFF. Stop writing data to buffer every second. """ self.connection.write(b"<HEARTBEAT0>>") self.connection.reset_buffers() logger.debug("Heartbeat OFF")
[docs] def heartbeat_live(self, count=60) -> int: """ Get live CPS data, as a generator. i.e. yield (return) CPS as available. Parameters ---------- count : int, optional How many CPS counts to return (default=60). Theoretically, 1 count = 1 second. Wall-clock time can be a bit higher or lower. Returns ------- int CPS - Counts-Per-Second Yields ------ Iterator[int] CPS """ self.connection.reset_buffers() for i in range(count): raw = self.connection.read_until(expected=b"", size=2) # only first 14 bits are used, because why not complicate things cps = struct.unpack(">H", raw)[0] & 0x3FFF yield cps
# def heartbeat_live_print(self, count=60, byte_size=4) -> None: # """ # Print live CPS data. # # Parameters # ---------- # count : int, optional # How many CPS counts to return (default=60). Theoretically, 1 count = 1 second. # Wall-clock time can be a bit higher or lower. # byte_size : int, optional # Expected result size, by default 4. # GMC-500=4, GMC-320=2 # # """ # max_ = 0 # i = 0 # self.connection.reset_buffers() # for cps in self.heartbeat_live(count=count, byte_size=byte_size): # i += 1 # if cps > max_: # max_ = cps # # empty leading space for terminal cursor # msg = f" cps={cps:<2} | max={max_:<2} | loop={i:<10,}" # print(msg, end="\r") # Carriage return - update line we just printed