From bd11bb6e3a913059ad673f4cb5816e09b8561447 Mon Sep 17 00:00:00 2001 From: Andy Spencer Date: Fri, 14 Feb 2014 08:07:48 +0000 Subject: [PATCH] Convert logger to use threads --- vis/logger.py | 141 +++++++++++++++++++++++++++++--------------------- 1 file changed, 81 insertions(+), 60 deletions(-) diff --git a/vis/logger.py b/vis/logger.py index 80cbd3a..97a7a3b 100644 --- a/vis/logger.py +++ b/vis/logger.py @@ -1,4 +1,6 @@ from datetime import datetime +from Queue import Queue +from thread import get_ident, start_new_thread from xively import XivelyAPIClient from xively import Datastream @@ -6,84 +8,103 @@ from xively import Datapoint class Logger: - api = None - feed = None - queue = [] - last = 0 - # Constructor def __init__(self, config): self.config = config + self.api = None + self.feed = None + self.last = 0 + self.limit = 500 + self.states = [] + self.queue = Queue() + self.thread = None + self.error = '' # Methods def connect(self): - self.api = XivelyAPIClient(self.config.apikey) - self.feed = self.api.feeds.get(self.config.feedid) - self.last = datetime.utcnow() - self.queue = [] + if not self.running(): + self.last = datetime.utcnow() + start_new_thread(self.task, ()) def disconnect(self): - self.api = None - self.feed = None + if self.running(): + self.thread = None + self.queue.put([]) def running(self): - if not self.api or not self.feed: - return False - return True + return bool(self.thread) def append(self, state): - if not self.feed: + if not self.running(): return - self.queue.append(state) + self.states.append(state) delta = datetime.utcnow() - self.last if delta.total_seconds() > self.config.maxrate: - return self.flush() + self.queue.put(self.states) + self.last = datetime.utcnow() + self.states = [] # Private methods - def flush(self): - def split(items, limit): - split = {} - count = 0 - sensors = {'acc': ['x','y','z'], - 'mag': ['x','y','z'], - 'tch': ['p','d'], - 'lgt': [''], - 'a2d': ['0','1','2','3','4','5']} - for state in items: - for sns in sensors: - data = getattr(state, sns) - for i in range(len(data)): - if data[i] == None: - continue - if count >= limit: - print('upload: cropping upload') - return split - chan = sns + sensors[sns][i] - if not split.has_key(chan): - split[chan] = [] - point = Datapoint(state.time, data[i]) - split[chan].append(point) - count += 1 - return split - - - if not self.running() or len(self.queue) == 0: - return + def split(self, items, limit): + split = {} + count = 0 + sensors = {'acc': ['x','y','z'], + 'mag': ['x','y','z'], + 'tch': ['p','d'], + 'lgt': [''], + 'a2d': ['0','1','2','3','4','5']} + for state in items: + for sns in sensors: + data = getattr(state, sns) + for i in range(len(data)): + if data[i] == None: + continue + if count >= limit: + print('upload: cropping upload') + return split + chan = sns + sensors[sns][i] + if not split.has_key(chan): + split[chan] = [] + point = Datapoint(state.time, data[i]) + split[chan].append(point) + count += 1 + return split + + def task(self): + print('thread - start') + self.thread = get_ident() + + # Setup connection + api = XivelyAPIClient(self.config.apikey) + feed = api.feeds.get(self.config.feedid) + + # Process requests from main + while self.running(): + # Pull data from queue (blocking) + states = self.queue.get() + print('thread - got %d states' % len(states)) + + # Make sure it's valid + if len(states) == 0: + continue - # Split data and limit to 500 points per upload - upload = split(self.queue, 500) + # Split data and limit to 500 points per upload + upload = self.split(states, self.limit) - # Create datastreams - self.feed.datastreams = \ - [ Datastream(id=chan, datapoints=upload[chan]) - for chan in upload.keys() ] + # Create datastreams + feed.datastreams = \ + [ Datastream(id=chan, datapoints=upload[chan]) + for chan in upload.keys() ] - # Clear queue - self.last = datetime.utcnow() - self.queue = [] + # Upload it + try: + print('thread - upload start') + feed.update() + print('thread - upload done') + except Exception as ex: + self.error = str(ex) + break - # Upload it - try: - self.feed.update() - except Exception as ex: - return str(ex) + # Notify main that we're done + print('thread - exit') + self.thread = None -- 2.43.2