]> Pileus Git - ~andy/csm213a-hw/commitdiff
Convert logger to use threads
authorAndy Spencer <andy753421@gmail.com>
Fri, 14 Feb 2014 08:07:48 +0000 (08:07 +0000)
committerAndy Spencer <andy753421@gmail.com>
Fri, 14 Feb 2014 08:07:48 +0000 (08:07 +0000)
vis/logger.py

index 80cbd3af8a07c5232412d9ad759a084ecaf53f1c..97a7a3b2179f8f17284ee618578d3fa667e5c2e3 100644 (file)
@@ -1,4 +1,6 @@
 from datetime import datetime
+from Queue    import Queue
+from thread   import get_ident, start_new_thread
 
 from xively import XivelyAPIClient
 from xively import Datastream
@@ -6,84 +8,103 @@ from xively import Datapoint
 
 
 class Logger:
-       api   = None
-       feed  = None
-       queue = []
-       last  = 0
-
        # Constructor
        def __init__(self, config):
                self.config = config
+               self.api    = None
+               self.feed   = None
+               self.last   = 0
+               self.limit  = 500
+               self.states = []
+               self.queue  = Queue()
+               self.thread = None
+               self.error  = ''
 
        # Methods
        def connect(self):
-               self.api   = XivelyAPIClient(self.config.apikey)
-               self.feed  = self.api.feeds.get(self.config.feedid)
-               self.last  = datetime.utcnow()
-               self.queue = []
+               if not self.running():
+                       self.last = datetime.utcnow()
+                       start_new_thread(self.task, ())
 
        def disconnect(self):
-               self.api   = None
-               self.feed  = None
+               if self.running():
+                       self.thread = None
+                       self.queue.put([])
 
        def running(self):
-               if not self.api or not self.feed:
-                       return False
-               return True
+               return bool(self.thread)
 
        def append(self, state):
-               if not self.feed:
+               if not self.running():
                        return
-               self.queue.append(state)
+               self.states.append(state)
                delta = datetime.utcnow() - self.last
                if delta.total_seconds() > self.config.maxrate:
-                       return self.flush()
+                       self.queue.put(self.states)
+                       self.last   = datetime.utcnow()
+                       self.states = []
 
        # Private methods
-       def flush(self):
-               def split(items, limit):
-                       split   = {}
-                       count   = 0
-                       sensors = {'acc': ['x','y','z'],
-                                  'mag': ['x','y','z'],
-                                  'tch': ['p','d'],
-                                  'lgt': [''],
-                                  'a2d': ['0','1','2','3','4','5']}
-                       for state in items:
-                               for sns in sensors:
-                                       data = getattr(state, sns)
-                                       for i in range(len(data)):
-                                               if data[i] == None:
-                                                       continue
-                                               if count >= limit:
-                                                       print('upload: cropping upload')
-                                                       return split
-                                               chan  = sns + sensors[sns][i]
-                                               if not split.has_key(chan):
-                                                       split[chan] = []
-                                               point = Datapoint(state.time, data[i])
-                                               split[chan].append(point)
-                                               count += 1
-                       return split
-
-
-               if not self.running() or len(self.queue) == 0:
-                       return
+       def split(self, items, limit):
+               split   = {}
+               count   = 0
+               sensors = {'acc': ['x','y','z'],
+                          'mag': ['x','y','z'],
+                          'tch': ['p','d'],
+                          'lgt': [''],
+                          'a2d': ['0','1','2','3','4','5']}
+               for state in items:
+                       for sns in sensors:
+                               data = getattr(state, sns)
+                               for i in range(len(data)):
+                                       if data[i] == None:
+                                               continue
+                                       if count >= limit:
+                                               print('upload: cropping upload')
+                                               return split
+                                       chan  = sns + sensors[sns][i]
+                                       if not split.has_key(chan):
+                                               split[chan] = []
+                                       point = Datapoint(state.time, data[i])
+                                       split[chan].append(point)
+                                       count += 1
+               return split
+
+       def task(self):
+               print('thread - start')
+               self.thread = get_ident()
+
+               # Setup connection
+               api  = XivelyAPIClient(self.config.apikey)
+               feed = api.feeds.get(self.config.feedid)
+
+               # Process requests from main
+               while self.running():
+                       # Pull data from queue (blocking)
+                       states = self.queue.get()
+                       print('thread - got %d states' % len(states))
+
+                       # Make sure it's valid
+                       if len(states) == 0:
+                               continue
 
-               # Split data and limit to 500 points per upload
-               upload = split(self.queue, 500)
+                       # Split data and limit to 500 points per upload
+                       upload = self.split(states, self.limit)
 
-               # Create datastreams
-               self.feed.datastreams = \
-                       [ Datastream(id=chan, datapoints=upload[chan])
-                         for chan in upload.keys() ]
+                       # Create datastreams
+                       feed.datastreams = \
+                               [ Datastream(id=chan, datapoints=upload[chan])
+                                 for chan in upload.keys() ]
 
-               # Clear queue
-               self.last  = datetime.utcnow()
-               self.queue = []
+                       # Upload it
+                       try:
+                               print('thread - upload start')
+                               feed.update()
+                               print('thread - upload done')
+                       except Exception as ex:
+                               self.error = str(ex)
+                               break
 
-               # Upload it
-               try:
-                       self.feed.update()
-               except Exception as ex:
-                       return str(ex)
+               # Notify main that we're done
+               print('thread - exit')
+               self.thread = None