+import sys
+
+from datetime import datetime
+
+from xively import XivelyAPIClient
+from xively import Datastream
+from xively import Datapoint
+
+if sys.version_info<(3,0,0):
+ from Queue import Queue
+ from thread import get_ident, start_new_thread
+else:
+ from queue import Queue
+ from _thread import get_ident, start_new_thread
+
class Logger:
+ # Constructor
def __init__(self, config):
self.config = config
+ self.api = None
+ self.feed = None
+ self.last = 0
+ self.limit = 500
+ self.states = []
+ self.queue = Queue()
+ self.thread = None
+ self.error = ''
+ # Methods
def connect(self):
- pass
+ if not self.running():
+ self.last = datetime.utcnow()
+ start_new_thread(self.task, ())
+
+ def disconnect(self):
+ if self.running():
+ self.thread = None
+ self.queue.put([])
+
+ def running(self):
+ return bool(self.thread)
+
+ def append(self, state):
+ if not self.running():
+ return
+ self.states.append(state)
+ delta = datetime.utcnow() - self.last
+ if delta.total_seconds() > self.config.maxrate:
+ self.queue.put(self.states)
+ self.last = datetime.utcnow()
+ self.states = []
+
+ # Private methods
+ def split(self, items, limit):
+ split = {}
+ count = 0
+ sensors = {'acc': ['x','y','z'],
+ 'mag': ['x','y','z'],
+ 'tch': ['p','d'],
+ 'lgt': [''],
+ 'a2d': ['0','1','2','3','4','5']}
+ for state in items:
+ for sns in sensors:
+ data = getattr(state, sns)
+ for i in range(len(data)):
+ if data[i] == None:
+ continue
+ if count >= limit:
+ print('upload: cropping upload')
+ return split
+ chan = sns + sensors[sns][i]
+ if not split.has_key(chan):
+ split[chan] = []
+ point = Datapoint(state.time, data[i])
+ split[chan].append(point)
+ count += 1
+ return split
+
+ def task(self):
+ print('thread - start')
+ self.thread = get_ident()
+
+ # Setup connection
+ api = XivelyAPIClient(self.config.apikey)
+ feed = api.feeds.get(self.config.feedid)
+
+ # Process requests from main
+ while self.running():
+ # Pull data from queue (blocking)
+ states = self.queue.get()
+ print('thread - got %d states' % len(states))
+
+ # Make sure it's valid
+ if len(states) == 0:
+ continue
+
+ # Split data and limit to 500 points per upload
+ upload = self.split(states, self.limit)
+
+ # Create datastreams
+ feed.datastreams = \
+ [ Datastream(id=chan, datapoints=upload[chan])
+ for chan in upload.keys() ]
+
+ # Upload it
+ try:
+ print('thread - upload start')
+ feed.update()
+ print('thread - upload done')
+ except Exception as ex:
+ self.error = str(ex)
+ break
+
+ # Notify main that we're done
+ print('thread - exit')
+ self.thread = None