import sys from datetime import datetime from xively import XivelyAPIClient from xively import Datastream from xively import Datapoint if sys.version_info<(3,0,0): from Queue import Queue from thread import get_ident, start_new_thread else: from queue import Queue from _thread import get_ident, start_new_thread class Logger: # Constructor def __init__(self, config): self.config = config self.api = None self.feed = None self.last = 0 self.limit = 500 self.states = [] self.queue = Queue() self.thread = None self.error = '' # Methods def connect(self): if not self.running(): self.last = datetime.utcnow() start_new_thread(self.task, ()) def disconnect(self): if self.running(): self.thread = None self.queue.put([]) def running(self): return bool(self.thread) def append(self, state): if not self.running(): return self.states.append(state) delta = datetime.utcnow() - self.last if delta.total_seconds() > self.config.maxrate: self.queue.put(self.states) self.last = datetime.utcnow() self.states = [] # Private methods def split(self, items, limit): split = {} count = 0 sensors = {'acc': ['x','y','z'], 'mag': ['x','y','z'], 'tch': ['p','d'], 'lgt': [''], 'a2d': ['0','1','2','3','4','5']} for state in items: for sns in sensors: data = getattr(state, sns) for i in range(len(data)): if data[i] == None: continue if count >= limit: print('upload: cropping upload') return split chan = sns + sensors[sns][i] if not split.has_key(chan): split[chan] = [] point = Datapoint(state.time, data[i]) split[chan].append(point) count += 1 return split def task(self): print('thread - start') self.thread = get_ident() # Setup connection api = XivelyAPIClient(self.config.apikey) feed = api.feeds.get(self.config.feedid) # Process requests from main while self.running(): # Pull data from queue (blocking) states = self.queue.get() print('thread - got %d states' % len(states)) # Make sure it's valid if len(states) == 0: continue # Split data and limit to 500 points per upload upload = self.split(states, self.limit) # Create datastreams feed.datastreams = \ [ Datastream(id=chan, datapoints=upload[chan]) for chan in upload.keys() ] # Upload it try: print('thread - upload start') feed.update() print('thread - upload done') except Exception as ex: self.error = str(ex) break # Notify main that we're done print('thread - exit') self.thread = None