from re import compile
from serial import Serial
from datetime import datetime
+from struct import *
-class State:
- acc = [None]*3
- mag = [None]*3
- touch = [None]*2
- light = [None]*1
- a2d = [None]*6
- time = None
+class State: #information stored
+ acc = [None]*3
+ mag = [None]*3
+ lgt = [None]*1
+ tch = [None]*2
+ a2d = [None]*6
+ time = None
def __init__(self):
self.time = datetime.utcnow()
-class Device:
- # Attributes
- port = "/dev/ttyACM0"
+class Frame:
+ # Sensor types
+ SNS_ACC = 0x00
+ SNS_MAG = 0x01
+ SNS_LGT = 0x02
+ SNS_TCH = 0x03
+ SNS_A2D = 0x04
+
+ SNS_NUM = 0x05
+ SNS_SHIFT = 4
+ SNS_MASK = 0xF0
+
+ # Data types
+ TYP_S8 = 0
+ TYP_S16 = 1
+ TYP_S32 = 2
+ TYP_U8 = 3
+ TYP_U16 = 4
+ TYP_U32 = 5
+ TYP_F32 = 6
+ TYP_F64 = 7
+
+ TYP_NUM = 8
+ TYP_SHIFT = 0
+ TYP_MASK = 0x0F
+
+ # Command codes
+ CMD_STOP = 0
+ CMD_START = 1
+ CMD_RATE = 2
+
+ CMD_NUM = 3
+ CMD_SHIFT = 0
+ CMD_MASK = 0x0F
+
+ # Frame information
+ HEADER = 0x02
+ TAIL = 0x0A
+
+ HEADER_POS = 0
+ BITS_POS = 1
+ COUNT_POS = 2
+ DATA_POS = 3
+
+ # Maps
+ snsMap = {SNS_ACC: 'acc',
+ SNS_MAG: 'mag',
+ SNS_LGT: 'lgt',
+ SNS_TCH: 'tch',
+ SNS_A2D: 'a2d'}
+
+ cmdMap = {CMD_START, 'start',
+ CMD_STOP, 'stop',
+ CMD_RATE, 'rate'}
+
+ sizeMap = {TYP_S8: 1, TYP_S16: 2, TYP_S32: 4,
+ TYP_U8: 1, TYP_U16: 2, TYP_U32: 4,
+ TYP_F32: 4, TYP_F64: 8}
+
+ fmtMap = {TYP_S8: 'b', TYP_S16: 'h', TYP_S32: 'i',
+ TYP_U8: 'B', TYP_U16: 'H', TYP_U32: 'I',
+ TYP_F32: 'f', TYP_F64: 'd'}
+
+ sampleNum= {SNS_ACC: 0,
+ SNS_MAG: 0,
+ SNS_LGT: 0,
+ SNS_TCH: 0,
+ SNS_A2D: 0}
+
+ # Parser data
+ index = 0 # read index
+ count = 0 # number of items in frame
+ length = 0 # length of frame (in bytes)
+ bits_sns = 0 # sensor type
+ bits_typ = 0 # data type
+ binary = "" # binary read-in
+ values = [] # converted numeric data
+
+ # Constructor
+ def __init__(self):
+ pass
+
+ # Converters
+ @staticmethod
+ def findCode(dataMap, name):
+ for code in dataMap:
+ if dataMap[code] == name:
+ return code
+ print("[ERROR] No code found")
+
+ # Parse frame
+ def parse(self, byte):
+ # save current pos and increment read index
+ # if we have an error we cna reset index below
+ pos = self.index
+ self.index += 1
+
+ if pos == Frame.HEADER_POS:
+ if ord(byte) != Frame.HEADER:
+ self.index = 0
+ #print('parse: header %02x' % ord(byte))
+
+ elif pos == Frame.BITS_POS:
+ self.bits_sns = (ord(byte) & Frame.SNS_MASK) >> Frame.SNS_SHIFT
+ self.bits_typ = (ord(byte) & Frame.TYP_MASK) >> Frame.TYP_SHIFT
+ if self.bits_sns >= Frame.SNS_NUM:
+ self.index = 0
+ if self.bits_typ >= Frame.TYP_NUM:
+ self.index = 0
+ #print('parse: bits sns=%d typ=%d' %
+ # (self.bits_sns, self.bits_typ))
+
+ elif pos == Frame.COUNT_POS:
+ wordsize = Frame.sizeMap[self.bits_typ]
+ self.count = ord(byte)
+ self.length = Frame.DATA_POS + self.count*wordsize + 1
+ #print('parse: count cnt=%d len=%d' %
+ # (self.count, self.length))
+
+ elif pos < self.length-1:
+ self.binary += byte
+ #print('parse: data %02x @%d' %
+ # (ord(byte), pos-Frame.DATA_POS))
+ elif pos == self.length-1:
+ #print('parse: tail %02x' % ord(byte))
+ if ord(byte) == Frame.TAIL:
+ state = self.convert()
+ else:
+ state = None
+ self.binary = ""
+ self.index = 0
+ return state
+
+ elif pos > self.length-1:
+ print('Error parsing')
+
+ # Convert frame to state
+ def convert(self):
+ # Covnert data
+ fmt = Frame.fmtMap[self.bits_typ] * self.count
+ sns = Frame.snsMap[self.bits_sns]
+ self.values = unpack('<'+fmt, self.binary)
+
+ # Print debug output
+ self.sampleNum[self.bits_sns] += 1
+ #if self.sampleNum[self.bits_sns] == 1000:
+ if self.sampleNum[self.bits_sns] == 1000:
+ print('convert: %3s = \'%3s\'%%[%s] -> [%s]' %
+ (sns, fmt, hexDump(self.binary), fltDump(self.values)))
+ self.sampleNum[self.bits_sns] = 0
+
+ # Create state
+ state = State()
+ setattr(state, sns, self.values)
+ return state
+
+class Device:
# Constructors
def __init__(self, config):
+ print('Defice.__init__')
self.config = config
self.serial = None
+ self.frame = Frame()
# Methods
def connect(self):
+ print('Device.connect')
+ buildingFrame = 0
try:
self.inbuf = []
- self.serial = Serial(self.port, \
+ self.serial = Serial(self.config.device, \
baudrate = self.config.baudrate, \
parity = self.config.parity, \
bytesize = self.config.databits, \
stopbits = self.config.stopbits, \
timeout = 0)
- self.control()
+ for sns in self.config.rate:
+ self.set_rate(sns, self.config.rate[sns])
+ for sns in self.config.enable:
+ self.set_enable(sns, self.config.enable[sns])
self.serial.flushInput()
except Exception as ex:
return str(ex)
def disconnect(self):
+ print('Device.disconnect')
if self.serial and self.serial.isOpen():
self.serial.close()
- def running(self):
+ def running(self): # isRunning
if self.serial == None:
return False
if self.serial.isOpen() == False:
return False
return True
- def control(self):
- for key in list(self.config.enable.keys()):
- state = self.config.enable[key]
- rate = self.config.rate[key]
- cmd = 'start' if state else 'stop'
- self._write_ascii('%s %s' % (cmd, key))
- self._write_ascii('set %s int %d' % (key, rate))
+ def set_rate(self, sensor, interval):
+ sns = Frame.findCode(Frame.snsMap, sensor)
+ bits = (sns << Frame.SNS_SHIFT) | \
+ (Frame.CMD_RATE << Frame.CMD_SHIFT)
+ self._write_binary('Bf', bits, interval)
+
+ def set_enable(self, sensor, enabled):
+ sns = Frame.findCode(Frame.snsMap, sensor)
+ cmd = Frame.CMD_START if enabled else Frame.CMD_STOP
+ bits = (sns << Frame.SNS_SHIFT) | \
+ (cmd << Frame.CMD_SHIFT)
+ self._write_binary('B', bits)
def process(self):
- items = []
- count = 0
- limit = 1000
if not self.running():
- return items
- while self.serial.readable():
- try:
- char = self.serial.read().decode()
- except Exception as ex:
- char = ''
- if len(char) == 0:
- break
- if char == '\r' or char == '\n':
- if len(self.inbuf) == 0:
- continue
- line = "".join(self.inbuf)
- print('read: [' + line + ']')
- item = self._parse_ascii(line)
- items.append(item)
- self.inbuf = []
- else:
- self.inbuf.append(char)
- if count > limit:
- print("Error: exceeded read limit")
- break
- count += 1
+ return []
+
+ items = []
+ while self.serial.inWaiting():
+ byte = self.serial.read()
+ state = self.frame.parse(byte)
+ if state:
+ items.append(state)
return items
+
+ # auxilary function
+ def frame_len(self, frame):
+ if len(frame) < 3:
+ return -1
+ dataType_snsType = ord(frame[1])
+ dataNum = ord(frame[2])
+ dataType = (dataType_snsType & Frame.TYP_MASK) >> Frame.TYP_SHIFT
+ snsType = (dataType_snsType & Frame.SNS_MASK) >> Frame.SNS_SHIFT
+ dataSize = Frame.sizeMap[dataType]
+ return (dataSize*dataNum+4)
+
+ def printHex(self, frame):
+ frameLen = self.frame_len(frame)
+ i = 0
+ while i<frameLen:
+ print(hex(ord(frame[i])))
+ i+=1
+
# Private methods
- def _write_ascii(self, line):
+ def _write_binary(self, fmt, *args):
+ #print('Device._write_binary')
if self.serial:
- print('write: [' + line + ']')
- data = bytes(line+'\n', 'UTF-8')
- self.serial.write(data)
+ fmt = 'B' + fmt + 'B'
+ args = [Frame.HEADER] + list(args) + [Frame.TAIL]
+ frame = pack('<'+fmt, *args)
+ print('write: bin:[' + hexDump(frame) + ']')
+ self.serial.write(frame)
self.serial.flush()
time.sleep(0.1)
- def _parse_ascii(self, line):
- acc_re = compile("\[ACC\] accX=(.*) accY=(.*) accZ=(.*)")
- mag_re = compile("\[MAG\] magX=(.*) magY=(.*) magZ=(.*)")
- lgt_re = compile("\[LGT\] Light Intensity=(.*)")
- tch_re = compile("\[TCH\] Force=(.*) Distance=(.*)")
- a2d_re = compile("\[A2D\] ...")
-
- acc_m = acc_re.match(line)
- mag_m = mag_re.match(line)
- lgt_m = lgt_re.match(line)
- tch_m = tch_re.match(line)
- a2d_m = a2d_re.match(line)
+def hexDump(frame):
+ digits = ['%02x' % ord(byte) for byte in frame]
+ return ' '.join(digits)
- state = State()
- if acc_m:
- state.acc[0] = float(acc_m.group(1))
- state.acc[1] = float(acc_m.group(2))
- state.acc[2] = float(acc_m.group(3))
- if mag_m:
- state.mag[0] = float(mag_m.group(1))
- state.mag[1] = float(mag_m.group(2))
- state.mag[2] = float(mag_m.group(3))
- if lgt_m:
- state.light[0] = float(lgt_m.group(1))
- if tch_m:
- state.touch[0] = float(tch_m.group(1))
- if a2d_m:
- state.a2d[0] = float(tch_m.group(1))
- state.a2d[1] = float(tch_m.group(2))
- state.a2d[2] = float(tch_m.group(3))
- state.a2d[3] = float(tch_m.group(4))
- state.a2d[4] = float(tch_m.group(5))
- state.a2d[5] = float(tch_m.group(6))
-
- return state
+def fltDump(data):
+ digits = ['%5.2f' % flt for flt in data]
+ return ' '.join(digits)