4 from serial import Serial
5 from datetime import datetime
8 class State: #information stored
17 self.time = datetime.utcnow()
64 snsMap = {SNS_ACC: 'acc',
70 cmdMap = {CMD_START, 'start',
74 sizeMap = {TYP_S8: 1, TYP_S16: 2, TYP_S32: 4,
75 TYP_U8: 1, TYP_U16: 2, TYP_U32: 4,
76 TYP_F32: 4, TYP_F64: 8}
78 fmtMap = {TYP_S8: 'b', TYP_S16: 'h', TYP_S32: 'i',
79 TYP_U8: 'B', TYP_U16: 'H', TYP_U32: 'I',
80 TYP_F32: 'f', TYP_F64: 'd'}
82 sampleNum= {SNS_ACC: 0,
89 index = 0 # read index
90 count = 0 # number of items in frame
91 length = 0 # length of frame (in bytes)
92 bits_sns = 0 # sensor type
93 bits_typ = 0 # data type
94 binary = "" # binary read-in
95 values = [] # converted numeric data
103 def findCode(dataMap, name):
105 if dataMap[code] == name:
107 print("[ERROR] No code found")
110 def parse(self, byte):
111 # save current pos and increment read index
112 # if we have an error we cna reset index below
116 if pos == Frame.HEADER_POS:
117 if ord(byte) != Frame.HEADER:
119 #print('parse: header %02x' % ord(byte))
121 elif pos == Frame.BITS_POS:
122 self.bits_sns = (ord(byte) & Frame.SNS_MASK) >> Frame.SNS_SHIFT
123 self.bits_typ = (ord(byte) & Frame.TYP_MASK) >> Frame.TYP_SHIFT
124 if self.bits_sns >= Frame.SNS_NUM:
126 if self.bits_typ >= Frame.TYP_NUM:
128 #print('parse: bits sns=%d typ=%d' %
129 # (self.bits_sns, self.bits_typ))
131 elif pos == Frame.COUNT_POS:
132 wordsize = Frame.sizeMap[self.bits_typ]
133 self.count = ord(byte)
134 self.length = Frame.DATA_POS + self.count*wordsize + 1
135 #print('parse: count cnt=%d len=%d' %
136 # (self.count, self.length))
138 elif pos < self.length-1:
140 #print('parse: data %02x @%d' %
141 # (ord(byte), pos-Frame.DATA_POS))
143 elif pos == self.length-1:
144 #print('parse: tail %02x' % ord(byte))
145 if ord(byte) == Frame.TAIL:
146 state = self.convert()
153 elif pos > self.length-1:
154 print('Error parsing')
156 # Convert frame to state
159 fmt = Frame.fmtMap[self.bits_typ] * self.count
160 sns = Frame.snsMap[self.bits_sns]
161 self.values = unpack('<'+fmt, self.binary)
164 self.sampleNum[self.bits_sns] += 1
165 if self.sampleNum[self.bits_sns] == 1000:
166 print('convert: %3s = \'%3s\'%%[%s] -> [%s]' %
167 (sns, fmt, hexDump(self.binary), fltDump(self.values)))
168 self.sampleNum[self.bits_sns] = 0
172 setattr(state, sns, self.values)
177 def __init__(self, config):
178 print('Defice.__init__')
185 print('Device.connect')
189 self.serial = Serial(self.config.device, \
190 baudrate = self.config.baudrate, \
191 parity = self.config.parity, \
192 bytesize = self.config.databits, \
193 stopbits = self.config.stopbits, \
195 for sns in self.config.rate:
196 self.set_rate(sns, self.config.rate[sns])
197 for sns in self.config.enable:
198 self.set_enable(sns, self.config.enable[sns])
199 self.serial.flushInput()
200 except Exception as ex:
203 def disconnect(self):
204 print('Device.disconnect')
205 if self.serial and self.serial.isOpen():
208 def running(self): # isRunning
209 if self.serial == None:
211 if self.serial.isOpen() == False:
215 def set_rate(self, sensor, interval):
216 sns = Frame.findCode(Frame.snsMap, sensor)
217 bits = (sns << Frame.SNS_SHIFT) | \
218 (Frame.CMD_RATE << Frame.CMD_SHIFT)
219 self._write_binary('Bf', bits, interval)
221 def set_enable(self, sensor, enabled):
222 sns = Frame.findCode(Frame.snsMap, sensor)
223 cmd = Frame.CMD_START if enabled else Frame.CMD_STOP
224 bits = (sns << Frame.SNS_SHIFT) | \
225 (cmd << Frame.CMD_SHIFT)
226 self._write_binary('B', bits)
229 if not self.running():
233 while self.serial.inWaiting():
234 byte = self.serial.read()
235 state = self.frame.parse(byte)
242 def frame_len(self, frame):
245 dataType_snsType = ord(frame[1])
246 dataNum = ord(frame[2])
247 dataType = (dataType_snsType & Frame.TYP_MASK) >> Frame.TYP_SHIFT
248 snsType = (dataType_snsType & Frame.SNS_MASK) >> Frame.SNS_SHIFT
249 dataSize = Frame.sizeMap[dataType]
250 return (dataSize*dataNum+4)
252 def printHex(self, frame):
253 frameLen = self.frame_len(frame)
256 print(hex(ord(frame[i])))
260 def _write_binary(self, fmt, *args):
261 #print('Device._write_binary')
263 fmt = 'B' + fmt + 'B'
264 args = [Frame.HEADER] + list(args) + [Frame.TAIL]
265 frame = pack('<'+fmt, *args)
266 print('write: bin:[' + hexDump(frame) + ']')
267 self.serial.write(frame)
272 digits = ['%02x' % ord(byte) for byte in frame]
273 return ' '.join(digits)
276 digits = ['%5.2f' % flt for flt in data]
277 return ' '.join(digits)