]> Pileus Git - ~andy/csm213a-hw/blobdiff - vis/logger.py
Split start and period set functions
[~andy/csm213a-hw] / vis / logger.py
index ffd79038781a5dfb9f89e328152aa0d9c06c76bd..ada465c57db40211d5bf495a6da2e3ff7fba88de 100644 (file)
@@ -1,6 +1,116 @@
+import sys
+
+from datetime import datetime
+
+from xively import XivelyAPIClient
+from xively import Datastream
+from xively import Datapoint
+
+if sys.version_info<(3,0,0):
+       from Queue   import Queue
+       from thread  import get_ident, start_new_thread
+else:
+       from queue   import Queue
+       from _thread import get_ident, start_new_thread
+
 class Logger:
+       # Constructor
        def __init__(self, config):
                self.config = config
+               self.api    = None
+               self.feed   = None
+               self.last   = 0
+               self.limit  = 500
+               self.states = []
+               self.queue  = Queue()
+               self.thread = None
+               self.error  = ''
 
+       # Methods
        def connect(self):
-               pass
+               if not self.running():
+                       self.last = datetime.utcnow()
+                       start_new_thread(self.task, ())
+
+       def disconnect(self):
+               if self.running():
+                       self.thread = None
+                       self.queue.put([])
+
+       def running(self):
+               return bool(self.thread)
+
+       def append(self, state):
+               if not self.running():
+                       return
+               self.states.append(state)
+               delta = datetime.utcnow() - self.last
+               if delta.total_seconds() > self.config.maxrate:
+                       self.queue.put(self.states)
+                       self.last   = datetime.utcnow()
+                       self.states = []
+
+       # Private methods
+       def split(self, items, limit):
+               split   = {}
+               count   = 0
+               sensors = {'acc': ['x','y','z'],
+                          'mag': ['x','y','z'],
+                          'tch': ['p','d'],
+                          'lgt': [''],
+                          'a2d': ['0','1','2','3','4','5']}
+               for state in items:
+                       for sns in sensors:
+                               data = getattr(state, sns)
+                               for i in range(len(data)):
+                                       if data[i] == None:
+                                               continue
+                                       if count >= limit:
+                                               print('upload: cropping upload')
+                                               return split
+                                       chan  = sns + sensors[sns][i]
+                                       if not split.has_key(chan):
+                                               split[chan] = []
+                                       point = Datapoint(state.time, data[i])
+                                       split[chan].append(point)
+                                       count += 1
+               return split
+
+       def task(self):
+               print('thread - start')
+               self.thread = get_ident()
+
+               # Setup connection
+               api  = XivelyAPIClient(self.config.apikey)
+               feed = api.feeds.get(self.config.feedid)
+
+               # Process requests from main
+               while self.running():
+                       # Pull data from queue (blocking)
+                       states = self.queue.get()
+                       print('thread - got %d states' % len(states))
+
+                       # Make sure it's valid
+                       if len(states) == 0:
+                               continue
+
+                       # Split data and limit to 500 points per upload
+                       upload = self.split(states, self.limit)
+
+                       # Create datastreams
+                       feed.datastreams = \
+                               [ Datastream(id=chan, datapoints=upload[chan])
+                                 for chan in upload.keys() ]
+
+                       # Upload it
+                       try:
+                               print('thread - upload start')
+                               feed.update()
+                               print('thread - upload done')
+                       except Exception as ex:
+                               self.error = str(ex)
+                               break
+
+               # Notify main that we're done
+               print('thread - exit')
+               self.thread = None