from re import compile from serial import Serial from datetime import datetime class State: acc = [None]*3 mag = [None]*3 touch = [None]*2 light = [None]*1 a2d = [None]*6 time = None def __init__(self): self.time = datetime.utcnow() class Device: # Attributes port = "/dev/ttyACM0" # Constructors def __init__(self, config): self.config = config self.serial = None # Methods def connect(self): try: self.inbuf = [] self.serial = Serial(self.port, \ baudrate = self.config.baudrate, \ parity = self.config.parity, \ bytesize = self.config.databits, \ stopbits = self.config.stopbits, \ timeout = 0) self.serial.flushInput() except Exception as ex: return str(ex) def disconnect(self): if self.serial and self.serial.isOpen(): self.serial.close() def running(self): if self.serial == None: return False if self.serial.isOpen() == False: return False return True def control(self): pass def process(self): items = [] count = 0 limit = 1000 if not self.running(): return items while self.serial.readable(): try: char = self.serial.read().decode() except Exception as ex: char = '' if len(char) == 0: break if char == '\r' or char == '\n': if len(self.inbuf) == 0: continue line = "".join(self.inbuf) item = self._parse_ascii(line) items.append(item) self.inbuf = [] else: self.inbuf.append(char) if count > limit: print("Error: exceeded read limit") break count += 1 return items # Private methods def _parse_ascii(self, line): acc_re = compile("\[ACC\] accX=(.*) accY=(.*) accZ=(.*)") mag_re = compile("\[MAG\] magX=(.*) magY=(.*) magZ=(.*)") lgt_re = compile("\[LGT\] Light Intensity=(.*)") tch_re = compile("\[TCH\] Force=(.*) Distance=(.*)") a2d_re = compile("\[A2D\] ...") acc_m = acc_re.match(line) mag_m = mag_re.match(line) lgt_m = lgt_re.match(line) tch_m = tch_re.match(line) a2d_m = a2d_re.match(line) state = State() if acc_m: state.acc[0] = float(acc_m.group(1)) state.acc[1] = float(acc_m.group(2)) state.acc[2] = float(acc_m.group(3)) if mag_m: state.mag[0] = float(mag_m.group(1)) state.mag[1] = float(mag_m.group(2)) state.mag[2] = float(mag_m.group(3)) if lgt_m: state.light[0] = float(lgt_m.group(1)) if tch_m: state.touch[0] = float(tch_m.group(1)) if a2d_m: state.a2d[0] = float(tch_m.group(1)) state.a2d[1] = float(tch_m.group(2)) state.a2d[2] = float(tch_m.group(3)) state.a2d[3] = float(tch_m.group(4)) state.a2d[4] = float(tch_m.group(5)) state.a2d[5] = float(tch_m.group(6)) return state