From ef6439548e2ebb0d33ffcd3c297d845b467422be Mon Sep 17 00:00:00 2001 From: Andy Spencer Date: Fri, 14 Feb 2014 07:12:40 +0000 Subject: [PATCH] Split Xively streams and debug upload --- vis/logger.py | 62 ++++++++++++++++++++++++++++++--------------------- vis/test.py | 52 +++++++++++++++++++++++++++--------------- 2 files changed, 70 insertions(+), 44 deletions(-) diff --git a/vis/logger.py b/vis/logger.py index a1cccf7..80cbd3a 100644 --- a/vis/logger.py +++ b/vis/logger.py @@ -41,39 +41,49 @@ class Logger: # Private methods def flush(self): - def isset(state, key): - value = getattr(state, key) - return any([x!=None for x in value]) - def get(items, key): - points = [ Datapoint(s.time, getattr(s,key)) \ - for s in items if isset(s,key) ] - stream = Datastream(id=key) - stream.datapoints = points - return stream + def split(items, limit): + split = {} + count = 0 + sensors = {'acc': ['x','y','z'], + 'mag': ['x','y','z'], + 'tch': ['p','d'], + 'lgt': [''], + 'a2d': ['0','1','2','3','4','5']} + for state in items: + for sns in sensors: + data = getattr(state, sns) + for i in range(len(data)): + if data[i] == None: + continue + if count >= limit: + print('upload: cropping upload') + return split + chan = sns + sensors[sns][i] + if not split.has_key(chan): + split[chan] = [] + point = Datapoint(state.time, data[i]) + split[chan].append(point) + count += 1 + return split + if not self.running() or len(self.queue) == 0: return - # Limit data to 500 samples per upload - limit = 500 - upload = self.queue - if len(upload) > limit: - print("upload: cropping - %d -> %d" % - (len(upload), limit)) - upload = upload[0:limit-1] + # Split data and limit to 500 points per upload + upload = split(self.queue, 500) + + # Create datastreams + self.feed.datastreams = \ + [ Datastream(id=chan, datapoints=upload[chan]) + for chan in upload.keys() ] - self.feed.datastreams = [ - get(upload, 'acc'), - get(upload, 'mag'), - get(upload, 'tch'), - get(upload, 'lgt'), - get(upload, 'a2d'), - ] + # Clear queue + self.last = datetime.utcnow() + self.queue = [] + # Upload it try: self.feed.update() except Exception as ex: return str(ex) - - self.last = datetime.utcnow() - self.queue = [] diff --git a/vis/test.py b/vis/test.py index abb4d6f..c7ce9ca 100644 --- a/vis/test.py +++ b/vis/test.py @@ -71,11 +71,17 @@ class TestXively: api_key = 'Py7atAQt2vAI5AFeWsUJMDqpIE1Dra5b0rPzlnr2ZDvDDoI5' # Times - time0 = datetime.datetime(2014,2,3, 3,0,0) - time1 = datetime.datetime(2014,2,3, 3,1,0) - time2 = datetime.datetime(2014,2,3, 3,2,0) - time3 = datetime.datetime(2014,2,3, 3,3,0) - time4 = datetime.datetime(2014,2,3, 3,4,0) + now = datetime.datetime.utcnow(); + time0 = now-datetime.timedelta(0,0*60); # datetime.datetime(2014,2,3, 3,0,0) + time1 = now-datetime.timedelta(0,1*60); # datetime.datetime(2014,2,3, 3,1,0) + time2 = now-datetime.timedelta(0,2*60); # datetime.datetime(2014,2,3, 3,2,0) + time3 = now-datetime.timedelta(0,3*60); # datetime.datetime(2014,2,3, 3,3,0) + time4 = now-datetime.timedelta(0,4*60); # datetime.datetime(2014,2,3, 3,4,0) + time5 = now-datetime.timedelta(0,5*60); # datetime.datetime(2014,2,3, 3,4,0) + time6 = now-datetime.timedelta(0,6*60); # datetime.datetime(2014,2,3, 3,4,0) + time7 = now-datetime.timedelta(0,7*60); # datetime.datetime(2014,2,3, 3,4,0) + time8 = now-datetime.timedelta(0,8*60); # datetime.datetime(2014,2,3, 3,4,0) + time9 = now-datetime.timedelta(0,9*60); # datetime.datetime(2014,2,3, 3,4,0) # Constructor def __init__(self): @@ -84,22 +90,32 @@ class TestXively: # Update the feed def send(self): - acc = Datastream(id='acc') + acc = Datastream(id='test_acc') acc.datapoints = [ - Datapoint(self.time0, [1, 1, 1]), - Datapoint(self.time1, [2, 2, 2]), - Datapoint(self.time2, [3, 3, 3]), - Datapoint(self.time3, [4, 4, 4]), - Datapoint(self.time4, [5, 5, 5]), + Datapoint(self.time0, [00, 1, 1]), + Datapoint(self.time1, [10, 2, 2]), + Datapoint(self.time2, [20, 3, 3]), + Datapoint(self.time3, [30, 4, 4]), + Datapoint(self.time4, [40, 5, 5]), + Datapoint(self.time5, [50, 5, 5]), + Datapoint(self.time6, [60, 5, 5]), + Datapoint(self.time7, [70, 5, 5]), + Datapoint(self.time8, [80, 5, 5]), + Datapoint(self.time9, [90, 5, 5]), ] - mag = Datastream(id='mag') + mag = Datastream(id='test_mag') mag.datapoints = [ - Datapoint(self.time0, [1, 1, 1]), - Datapoint(self.time1, [2, 2, 2]), - Datapoint(self.time2, [3, 3, 3]), - Datapoint(self.time3, [4, 4, 4]), - Datapoint(self.time4, [5, 5, 5]), + Datapoint(self.time0, 00), + Datapoint(self.time1, 10), + Datapoint(self.time2, 20), + Datapoint(self.time3, 30), + Datapoint(self.time4, 40), + Datapoint(self.time5, 50), + Datapoint(self.time6, 60), + Datapoint(self.time7, 70), + Datapoint(self.time8, 80), + Datapoint(self.time9, 90), ] self.feed.datastreams = [ acc, mag ] @@ -107,7 +123,7 @@ class TestXively: # Read it back def recv(self): - stream = self.feed.datastreams.get('acc') + stream = self.feed.datastreams.get('test_acc') points = stream.datapoints.history( \ start=self.time0, end=self.time4, interval=0) -- 2.43.2