]> Pileus Git - ~andy/csm213a-hw/blobdiff - vis/logger.py
Fix whitespace error
[~andy/csm213a-hw] / vis / logger.py
index 84b6e0d5e24d3da740dd75287bddfb84a5bd3065..ada465c57db40211d5bf495a6da2e3ff7fba88de 100644 (file)
+import sys
+
 from datetime import datetime
 
 from xively import XivelyAPIClient
 from xively import Datastream
 from xively import Datapoint
 
+if sys.version_info<(3,0,0):
+       from Queue   import Queue
+       from thread  import get_ident, start_new_thread
+else:
+       from queue   import Queue
+       from _thread import get_ident, start_new_thread
 
 class Logger:
-       api   = None
-       feed  = None
-       queue = []
-       last  = 0
-
-       # Methods
+       # Constructor
        def __init__(self, config):
                self.config = config
+               self.api    = None
+               self.feed   = None
+               self.last   = 0
+               self.limit  = 500
+               self.states = []
+               self.queue  = Queue()
+               self.thread = None
+               self.error  = ''
 
        # Methods
        def connect(self):
-               self.api   = XivelyAPIClient(self.config.apikey)
-               self.feed  = self.api.feeds.get(self.config.feedid)
-               self.last  = datetime.utcnow()
-               self.queue = []
+               if not self.running():
+                       self.last = datetime.utcnow()
+                       start_new_thread(self.task, ())
 
        def disconnect(self):
-               self.api   = None
-               self.feed  = None
+               if self.running():
+                       self.thread = None
+                       self.queue.put([])
 
        def running(self):
-               if not self.api or not self.feed:
-                       return False
-               return True
+               return bool(self.thread)
 
        def append(self, state):
-               if not self.feed:
+               if not self.running():
                        return
-               self.queue.append(state)
+               self.states.append(state)
                delta = datetime.utcnow() - self.last
                if delta.total_seconds() > self.config.maxrate:
-                       self.flush()
+                       self.queue.put(self.states)
+                       self.last   = datetime.utcnow()
+                       self.states = []
 
        # Private methods
-       def flush(self):
-               def isset(state, key):
-                       value = getattr(state, key)
-                       return any([x!=None for x in value])
-               def get(items, key):
-                       points = [ Datapoint(s.time, getattr(s,key)) \
-                               for s in items if isset(s,key) ]
-                       stream = Datastream(id=key)
-                       stream.datapoints = points
-                       return stream
-
-               if not self.running() or len(self.queue) == 0:
-                       return
+       def split(self, items, limit):
+               split   = {}
+               count   = 0
+               sensors = {'acc': ['x','y','z'],
+                          'mag': ['x','y','z'],
+                          'tch': ['p','d'],
+                          'lgt': [''],
+                          'a2d': ['0','1','2','3','4','5']}
+               for state in items:
+                       for sns in sensors:
+                               data = getattr(state, sns)
+                               for i in range(len(data)):
+                                       if data[i] == None:
+                                               continue
+                                       if count >= limit:
+                                               print('upload: cropping upload')
+                                               return split
+                                       chan  = sns + sensors[sns][i]
+                                       if not split.has_key(chan):
+                                               split[chan] = []
+                                       point = Datapoint(state.time, data[i])
+                                       split[chan].append(point)
+                                       count += 1
+               return split
+
+       def task(self):
+               print('thread - start')
+               self.thread = get_ident()
+
+               # Setup connection
+               api  = XivelyAPIClient(self.config.apikey)
+               feed = api.feeds.get(self.config.feedid)
+
+               # Process requests from main
+               while self.running():
+                       # Pull data from queue (blocking)
+                       states = self.queue.get()
+                       print('thread - got %d states' % len(states))
+
+                       # Make sure it's valid
+                       if len(states) == 0:
+                               continue
+
+                       # Split data and limit to 500 points per upload
+                       upload = self.split(states, self.limit)
+
+                       # Create datastreams
+                       feed.datastreams = \
+                               [ Datastream(id=chan, datapoints=upload[chan])
+                                 for chan in upload.keys() ]
+
+                       # Upload it
+                       try:
+                               print('thread - upload start')
+                               feed.update()
+                               print('thread - upload done')
+                       except Exception as ex:
+                               self.error = str(ex)
+                               break
 
-               self.feed.datastreams = [
-                       get(self.queue, 'acc'),
-                       get(self.queue, 'mag'),
-                       get(self.queue, 'touch'),
-                       get(self.queue, 'light'),
-                       get(self.queue, 'a2d'),
-               ]
-               self.feed.update()
-
-               self.last  = datetime.utcnow()
-               self.queue = []
+               # Notify main that we're done
+               print('thread - exit')
+               self.thread = None