+import sys
+
from datetime import datetime
from xively import XivelyAPIClient
from xively import Datastream
from xively import Datapoint
+if sys.version_info<(3,0,0):
+ from Queue import Queue
+ from thread import get_ident, start_new_thread
+else:
+ from queue import Queue
+ from _thread import get_ident, start_new_thread
class Logger:
- api = None
- feed = None
- queue = []
- last = 0
-
- # Methods
+ # Constructor
def __init__(self, config):
self.config = config
+ self.api = None
+ self.feed = None
+ self.last = 0
+ self.limit = 500
+ self.states = []
+ self.queue = Queue()
+ self.thread = None
+ self.error = ''
# Methods
def connect(self):
- self.api = XivelyAPIClient(self.config.apikey)
- self.feed = self.api.feeds.get(self.config.feedid)
- self.last = datetime.utcnow()
- self.queue = []
+ if not self.running():
+ self.last = datetime.utcnow()
+ start_new_thread(self.task, ())
def disconnect(self):
- self.api = None
- self.feed = None
+ if self.running():
+ self.thread = None
+ self.queue.put([])
def running(self):
- if not self.api or not self.feed:
- return False
- return True
+ return bool(self.thread)
def append(self, state):
- if not self.feed:
+ if not self.running():
return
- self.queue.append(state)
+ self.states.append(state)
delta = datetime.utcnow() - self.last
if delta.total_seconds() > self.config.maxrate:
- self.flush()
+ self.queue.put(self.states)
+ self.last = datetime.utcnow()
+ self.states = []
# Private methods
- def flush(self):
- def isset(state, key):
- value = getattr(state, key)
- return any([x!=None for x in value])
- def get(items, key):
- points = [ Datapoint(s.time, getattr(s,key)) \
- for s in items if isset(s,key) ]
- stream = Datastream(id=key)
- stream.datapoints = points
- return stream
-
- if not self.running() or len(self.queue) == 0:
- return
+ def split(self, items, limit):
+ split = {}
+ count = 0
+ sensors = {'acc': ['x','y','z'],
+ 'mag': ['x','y','z'],
+ 'tch': ['p','d'],
+ 'lgt': [''],
+ 'a2d': ['0','1','2','3','4','5']}
+ for state in items:
+ for sns in sensors:
+ data = getattr(state, sns)
+ for i in range(len(data)):
+ if data[i] == None:
+ continue
+ if count >= limit:
+ print('upload: cropping upload')
+ return split
+ chan = sns + sensors[sns][i]
+ if not split.has_key(chan):
+ split[chan] = []
+ point = Datapoint(state.time, data[i])
+ split[chan].append(point)
+ count += 1
+ return split
+
+ def task(self):
+ print('thread - start')
+ self.thread = get_ident()
+
+ # Setup connection
+ api = XivelyAPIClient(self.config.apikey)
+ feed = api.feeds.get(self.config.feedid)
+
+ # Process requests from main
+ while self.running():
+ # Pull data from queue (blocking)
+ states = self.queue.get()
+ print('thread - got %d states' % len(states))
+
+ # Make sure it's valid
+ if len(states) == 0:
+ continue
+
+ # Split data and limit to 500 points per upload
+ upload = self.split(states, self.limit)
+
+ # Create datastreams
+ feed.datastreams = \
+ [ Datastream(id=chan, datapoints=upload[chan])
+ for chan in upload.keys() ]
+
+ # Upload it
+ try:
+ print('thread - upload start')
+ feed.update()
+ print('thread - upload done')
+ except Exception as ex:
+ self.error = str(ex)
+ break
- self.feed.datastreams = [
- get(self.queue, 'acc'),
- get(self.queue, 'mag'),
- get(self.queue, 'touch'),
- get(self.queue, 'light'),
- get(self.queue, 'a2d'),
- ]
- self.feed.update()
-
- self.last = datetime.utcnow()
- self.queue = []
+ # Notify main that we're done
+ print('thread - exit')
+ self.thread = None