from datetime import datetime
+from Queue import Queue
+from thread import get_ident, start_new_thread
from xively import XivelyAPIClient
from xively import Datastream
class Logger:
- api = None
- feed = None
- queue = []
- last = 0
-
# Constructor
def __init__(self, config):
self.config = config
+ self.api = None
+ self.feed = None
+ self.last = 0
+ self.limit = 500
+ self.states = []
+ self.queue = Queue()
+ self.thread = None
+ self.error = ''
# Methods
def connect(self):
- self.api = XivelyAPIClient(self.config.apikey)
- self.feed = self.api.feeds.get(self.config.feedid)
- self.last = datetime.utcnow()
- self.queue = []
+ if not self.running():
+ self.last = datetime.utcnow()
+ start_new_thread(self.task, ())
def disconnect(self):
- self.api = None
- self.feed = None
+ if self.running():
+ self.thread = None
+ self.queue.put([])
def running(self):
- if not self.api or not self.feed:
- return False
- return True
+ return bool(self.thread)
def append(self, state):
- if not self.feed:
+ if not self.running():
return
- self.queue.append(state)
+ self.states.append(state)
delta = datetime.utcnow() - self.last
if delta.total_seconds() > self.config.maxrate:
- return self.flush()
+ self.queue.put(self.states)
+ self.last = datetime.utcnow()
+ self.states = []
# Private methods
- def flush(self):
- def split(items, limit):
- split = {}
- count = 0
- sensors = {'acc': ['x','y','z'],
- 'mag': ['x','y','z'],
- 'tch': ['p','d'],
- 'lgt': [''],
- 'a2d': ['0','1','2','3','4','5']}
- for state in items:
- for sns in sensors:
- data = getattr(state, sns)
- for i in range(len(data)):
- if data[i] == None:
- continue
- if count >= limit:
- print('upload: cropping upload')
- return split
- chan = sns + sensors[sns][i]
- if not split.has_key(chan):
- split[chan] = []
- point = Datapoint(state.time, data[i])
- split[chan].append(point)
- count += 1
- return split
-
-
- if not self.running() or len(self.queue) == 0:
- return
+ def split(self, items, limit):
+ split = {}
+ count = 0
+ sensors = {'acc': ['x','y','z'],
+ 'mag': ['x','y','z'],
+ 'tch': ['p','d'],
+ 'lgt': [''],
+ 'a2d': ['0','1','2','3','4','5']}
+ for state in items:
+ for sns in sensors:
+ data = getattr(state, sns)
+ for i in range(len(data)):
+ if data[i] == None:
+ continue
+ if count >= limit:
+ print('upload: cropping upload')
+ return split
+ chan = sns + sensors[sns][i]
+ if not split.has_key(chan):
+ split[chan] = []
+ point = Datapoint(state.time, data[i])
+ split[chan].append(point)
+ count += 1
+ return split
+
+ def task(self):
+ print('thread - start')
+ self.thread = get_ident()
+
+ # Setup connection
+ api = XivelyAPIClient(self.config.apikey)
+ feed = api.feeds.get(self.config.feedid)
+
+ # Process requests from main
+ while self.running():
+ # Pull data from queue (blocking)
+ states = self.queue.get()
+ print('thread - got %d states' % len(states))
+
+ # Make sure it's valid
+ if len(states) == 0:
+ continue
- # Split data and limit to 500 points per upload
- upload = split(self.queue, 500)
+ # Split data and limit to 500 points per upload
+ upload = self.split(states, self.limit)
- # Create datastreams
- self.feed.datastreams = \
- [ Datastream(id=chan, datapoints=upload[chan])
- for chan in upload.keys() ]
+ # Create datastreams
+ feed.datastreams = \
+ [ Datastream(id=chan, datapoints=upload[chan])
+ for chan in upload.keys() ]
- # Clear queue
- self.last = datetime.utcnow()
- self.queue = []
+ # Upload it
+ try:
+ print('thread - upload start')
+ feed.update()
+ print('thread - upload done')
+ except Exception as ex:
+ self.error = str(ex)
+ break
- # Upload it
- try:
- self.feed.update()
- except Exception as ex:
- return str(ex)
+ # Notify main that we're done
+ print('thread - exit')
+ self.thread = None