self.queue.append(state)
delta = datetime.utcnow() - self.last
if delta.total_seconds() > self.config.maxrate:
- self.flush()
+ return self.flush()
# Private methods
def flush(self):
- def isset(state, key):
- value = getattr(state, key)
- return any([x!=None for x in value])
- def get(items, key):
- points = [ Datapoint(s.time, getattr(s,key)) \
- for s in items if isset(s,key) ]
- stream = Datastream(id=key)
- stream.datapoints = points
- return stream
+ def split(items, limit):
+ split = {}
+ count = 0
+ sensors = {'acc': ['x','y','z'],
+ 'mag': ['x','y','z'],
+ 'tch': ['p','d'],
+ 'lgt': [''],
+ 'a2d': ['0','1','2','3','4','5']}
+ for state in items:
+ for sns in sensors:
+ data = getattr(state, sns)
+ for i in range(len(data)):
+ if data[i] == None:
+ continue
+ if count >= limit:
+ print('upload: cropping upload')
+ return split
+ chan = sns + sensors[sns][i]
+ if not split.has_key(chan):
+ split[chan] = []
+ point = Datapoint(state.time, data[i])
+ split[chan].append(point)
+ count += 1
+ return split
+
if not self.running() or len(self.queue) == 0:
return
- self.feed.datastreams = [
- get(self.queue, 'acc'),
- get(self.queue, 'mag'),
- get(self.queue, 'tch'),
- get(self.queue, 'lgt'),
- get(self.queue, 'a2d'),
- ]
- self.feed.update()
+ # Split data and limit to 500 points per upload
+ upload = split(self.queue, 500)
+
+ # Create datastreams
+ self.feed.datastreams = \
+ [ Datastream(id=chan, datapoints=upload[chan])
+ for chan in upload.keys() ]
+ # Clear queue
self.last = datetime.utcnow()
self.queue = []
+
+ # Upload it
+ try:
+ self.feed.update()
+ except Exception as ex:
+ return str(ex)