-from serial import Serial
+import time
+
+from re import compile
+from serial import Serial
+from datetime import datetime
+
+class State:
+ acc = [None]*3
+ mag = [None]*3
+ touch = [None]*2
+ light = [None]*1
+ a2d = [None]*6
+ time = None
+
+ def __init__(self):
+ self.time = datetime.utcnow()
class Device:
# Attributes
# Constructors
def __init__(self, config):
self.config = config
+ self.serial = None
# Methods
def connect(self):
- self.conn = Serial(self.port, \
- baudrate = self.config.baudrate, \
- parity = self.config.parity, \
- bytesize = self.config.databits, \
- stopbits = self.config.stopbits)
+ try:
+ self.inbuf = []
+ self.serial = Serial(self.port, \
+ baudrate = self.config.baudrate, \
+ parity = self.config.parity, \
+ bytesize = self.config.databits, \
+ stopbits = self.config.stopbits, \
+ timeout = 0)
+ self.control()
+ self.serial.flushInput()
+ except Exception as ex:
+ return str(ex)
+
+ def disconnect(self):
+ if self.serial and self.serial.isOpen():
+ self.serial.close()
+
+ def running(self):
+ if self.serial == None:
+ return False
+ if self.serial.isOpen() == False:
+ return False
+ return True
+
+ def control(self):
+ for key in list(self.config.enable.keys()):
+ state = self.config.enable[key]
+ rate = self.config.rate[key]
+ cmd = 'start' if state else 'stop'
+ self._write_ascii('%s %s' % (cmd, key))
+ self._write_ascii('set %s int %d' % (key, rate))
+
+ def process(self):
+ items = []
+ count = 0
+ limit = 1000
+ if not self.running():
+ return items
+ while self.serial.readable():
+ try:
+ char = self.serial.read().decode()
+ except Exception as ex:
+ char = ''
+ if len(char) == 0:
+ break
+ if char == '\r' or char == '\n':
+ if len(self.inbuf) == 0:
+ continue
+ line = "".join(self.inbuf)
+ print('read: [' + line + ']')
+ item = self._parse_ascii(line)
+ items.append(item)
+ self.inbuf = []
+ else:
+ self.inbuf.append(char)
+ if count > limit:
+ print("Error: exceeded read limit")
+ break
+ count += 1
+ return items
+
+ # Private methods
+ def _write_ascii(self, line):
+ if self.serial:
+ print('write: [' + line + ']')
+ data = bytes(line+'\n', 'UTF-8')
+ self.serial.write(data)
+ self.serial.flush()
+ time.sleep(0.1)
+
+ def _parse_ascii(self, line):
+ acc_re = compile("\[ACC\] accX=(.*) accY=(.*) accZ=(.*)")
+ mag_re = compile("\[MAG\] magX=(.*) magY=(.*) magZ=(.*)")
+ lgt_re = compile("\[LGT\] Light Intensity=(.*)")
+ tch_re = compile("\[TCH\] Force=(.*) Distance=(.*)")
+ a2d_re = compile("\[A2D\] ...")
+
+ acc_m = acc_re.match(line)
+ mag_m = mag_re.match(line)
+ lgt_m = lgt_re.match(line)
+ tch_m = tch_re.match(line)
+ a2d_m = a2d_re.match(line)
+
+ state = State()
+ if acc_m:
+ state.acc[0] = float(acc_m.group(1))
+ state.acc[1] = float(acc_m.group(2))
+ state.acc[2] = float(acc_m.group(3))
+ if mag_m:
+ state.mag[0] = float(mag_m.group(1))
+ state.mag[1] = float(mag_m.group(2))
+ state.mag[2] = float(mag_m.group(3))
+ if lgt_m:
+ state.light[0] = float(lgt_m.group(1))
+ if tch_m:
+ state.touch[0] = float(tch_m.group(1))
+ if a2d_m:
+ state.a2d[0] = float(tch_m.group(1))
+ state.a2d[1] = float(tch_m.group(2))
+ state.a2d[2] = float(tch_m.group(3))
+ state.a2d[3] = float(tch_m.group(4))
+ state.a2d[4] = float(tch_m.group(5))
+ state.a2d[5] = float(tch_m.group(6))
+ return state