1 from datetime import datetime
2 from Queue import Queue
3 from thread import get_ident, start_new_thread
5 from xively import XivelyAPIClient
6 from xively import Datastream
7 from xively import Datapoint
12 def __init__(self, config):
25 if not self.running():
26 self.last = datetime.utcnow()
27 start_new_thread(self.task, ())
35 return bool(self.thread)
37 def append(self, state):
38 if not self.running():
40 self.states.append(state)
41 delta = datetime.utcnow() - self.last
42 if delta.total_seconds() > self.config.maxrate:
43 self.queue.put(self.states)
44 self.last = datetime.utcnow()
48 def split(self, items, limit):
51 sensors = {'acc': ['x','y','z'],
55 'a2d': ['0','1','2','3','4','5']}
58 data = getattr(state, sns)
59 for i in range(len(data)):
63 print('upload: cropping upload')
65 chan = sns + sensors[sns][i]
66 if not split.has_key(chan):
68 point = Datapoint(state.time, data[i])
69 split[chan].append(point)
74 print('thread - start')
75 self.thread = get_ident()
78 api = XivelyAPIClient(self.config.apikey)
79 feed = api.feeds.get(self.config.feedid)
81 # Process requests from main
83 # Pull data from queue (blocking)
84 states = self.queue.get()
85 print('thread - got %d states' % len(states))
87 # Make sure it's valid
91 # Split data and limit to 500 points per upload
92 upload = self.split(states, self.limit)
96 [ Datastream(id=chan, datapoints=upload[chan])
97 for chan in upload.keys() ]
101 print('thread - upload start')
103 print('thread - upload done')
104 except Exception as ex:
108 # Notify main that we're done
109 print('thread - exit')