import time from re import compile from serial import Serial from datetime import datetime from struct import * class State: #information stored acc = [None]*3 mag = [None]*3 lgt = [None]*1 tch = [None]*2 a2d = [None]*6 time = None def __init__(self): self.time = datetime.utcnow() class Frame: # Sensor types SNS_ACC = 0x00 SNS_MAG = 0x01 SNS_LGT = 0x02 SNS_TCH = 0x03 SNS_A2D = 0x04 SNS_NUM = 0x05 SNS_SHIFT = 4 SNS_MASK = 0xF0 # Data types TYP_S8 = 0 TYP_S16 = 1 TYP_S32 = 2 TYP_U8 = 3 TYP_U16 = 4 TYP_U32 = 5 TYP_F32 = 6 TYP_F64 = 7 TYP_NUM = 8 TYP_SHIFT = 0 TYP_MASK = 0x0F # Command codes CMD_STOP = 0 CMD_START = 1 CMD_RATE = 2 CMD_NUM = 3 CMD_SHIFT = 0 CMD_MASK = 0x0F # Frame information HEADER = 0x02 TAIL = 0x0A HEADER_POS = 0 BITS_POS = 1 COUNT_POS = 2 DATA_POS = 3 # Maps snsMap = {SNS_ACC: 'acc', SNS_MAG: 'mag', SNS_LGT: 'lgt', SNS_TCH: 'tch', SNS_A2D: 'a2d'} cmdMap = {CMD_START, 'start', CMD_STOP, 'stop', CMD_RATE, 'rate'} sizeMap = {TYP_S8: 1, TYP_S16: 2, TYP_S32: 4, TYP_U8: 1, TYP_U16: 2, TYP_U32: 4, TYP_F32: 4, TYP_F64: 8} fmtMap = {TYP_S8: 'b', TYP_S16: 'h', TYP_S32: 'i', TYP_U8: 'B', TYP_U16: 'H', TYP_U32: 'I', TYP_F32: 'f', TYP_F64: 'd'} sampleNum= {SNS_ACC: 0, SNS_MAG: 0, SNS_LGT: 0, SNS_TCH: 0, SNS_A2D: 0} # Parser data index = 0 # read index count = 0 # number of items in frame length = 0 # length of frame (in bytes) bits_sns = 0 # sensor type bits_typ = 0 # data type binary = "" # binary read-in values = [] # converted numeric data # Constructor def __init__(self): pass # Converters @staticmethod def findCode(dataMap, name): for code in dataMap: if dataMap[code] == name: return code print("[ERROR] No code found") # Parse frame def parse(self, byte): # save current pos and increment read index # if we have an error we cna reset index below pos = self.index self.index += 1 if pos == Frame.HEADER_POS: if ord(byte) != Frame.HEADER: self.index = 0 #print('parse: header %02x' % ord(byte)) elif pos == Frame.BITS_POS: self.bits_sns = (ord(byte) & Frame.SNS_MASK) >> Frame.SNS_SHIFT self.bits_typ = (ord(byte) & Frame.TYP_MASK) >> Frame.TYP_SHIFT if self.bits_sns >= Frame.SNS_NUM: self.index = 0 if self.bits_typ >= Frame.TYP_NUM: self.index = 0 #print('parse: bits sns=%d typ=%d' % # (self.bits_sns, self.bits_typ)) elif pos == Frame.COUNT_POS: wordsize = Frame.sizeMap[self.bits_typ] self.count = ord(byte) self.length = Frame.DATA_POS + self.count*wordsize + 1 #print('parse: count cnt=%d len=%d' % # (self.count, self.length)) elif pos < self.length-1: self.binary += byte #print('parse: data %02x @%d' % # (ord(byte), pos-Frame.DATA_POS)) elif pos == self.length-1: #print('parse: tail %02x' % ord(byte)) if ord(byte) == Frame.TAIL: state = self.convert() else: state = None self.binary = "" self.index = 0 return state elif pos > self.length-1: print('Error parsing') # Convert frame to state def convert(self): # Covnert data fmt = Frame.fmtMap[self.bits_typ] * self.count sns = Frame.snsMap[self.bits_sns] self.values = unpack('<'+fmt, self.binary) # Print debug output self.sampleNum[self.bits_sns] += 1 #if self.sampleNum[self.bits_sns] == 1000: if self.sampleNum[self.bits_sns] == 1000: print('convert: %3s = \'%3s\'%%[%s] -> [%s]' % (sns, fmt, hexDump(self.binary), fltDump(self.values))) self.sampleNum[self.bits_sns] = 0 # Create state state = State() setattr(state, sns, self.values) return state class Device: # Constructors def __init__(self, config): print('Defice.__init__') self.config = config self.serial = None self.frame = Frame() # Methods def connect(self): print('Device.connect') buildingFrame = 0 try: self.inbuf = [] self.serial = Serial(self.config.device, \ baudrate = self.config.baudrate, \ parity = self.config.parity, \ bytesize = self.config.databits, \ stopbits = self.config.stopbits, \ timeout = 0) for sns in self.config.rate: self.set_rate(sns, self.config.rate[sns]) for sns in self.config.enable: self.set_enable(sns, self.config.enable[sns]) self.serial.flushInput() except Exception as ex: return str(ex) def disconnect(self): print('Device.disconnect') if self.serial and self.serial.isOpen(): self.serial.close() def running(self): # isRunning if self.serial == None: return False if self.serial.isOpen() == False: return False return True def set_rate(self, sensor, interval): sns = Frame.findCode(Frame.snsMap, sensor) bits = (sns << Frame.SNS_SHIFT) | \ (Frame.CMD_RATE << Frame.CMD_SHIFT) self._write_binary('Bf', bits, interval) def set_enable(self, sensor, enabled): sns = Frame.findCode(Frame.snsMap, sensor) cmd = Frame.CMD_START if enabled else Frame.CMD_STOP bits = (sns << Frame.SNS_SHIFT) | \ (cmd << Frame.CMD_SHIFT) self._write_binary('B', bits) def process(self): if not self.running(): return [] items = [] while self.serial.inWaiting(): byte = self.serial.read() state = self.frame.parse(byte) if state: items.append(state) return items # auxilary function def frame_len(self, frame): if len(frame) < 3: return -1 dataType_snsType = ord(frame[1]) dataNum = ord(frame[2]) dataType = (dataType_snsType & Frame.TYP_MASK) >> Frame.TYP_SHIFT snsType = (dataType_snsType & Frame.SNS_MASK) >> Frame.SNS_SHIFT dataSize = Frame.sizeMap[dataType] return (dataSize*dataNum+4) def printHex(self, frame): frameLen = self.frame_len(frame) i = 0 while i