3 from datetime import datetime
5 from xively import XivelyAPIClient
6 from xively import Datastream
7 from xively import Datapoint
9 if sys.version_info<(3,0,0):
10 from Queue import Queue
11 from thread import get_ident, start_new_thread
13 from queue import Queue
14 from _thread import get_ident, start_new_thread
18 def __init__(self, config):
31 if not self.running():
32 self.last = datetime.utcnow()
33 start_new_thread(self.task, ())
41 return bool(self.thread)
43 def append(self, state):
44 if not self.running():
46 self.states.append(state)
47 delta = datetime.utcnow() - self.last
48 if delta.total_seconds() > self.config.maxrate:
49 self.queue.put(self.states)
50 self.last = datetime.utcnow()
54 def split(self, items, limit):
57 sensors = {'acc': ['x','y','z'],
61 'a2d': ['0','1','2','3','4','5']}
64 data = getattr(state, sns)
65 for i in range(len(data)):
69 print('upload: cropping upload')
71 chan = sns + sensors[sns][i]
72 if not split.has_key(chan):
74 point = Datapoint(state.time, data[i])
75 split[chan].append(point)
80 print('thread - start')
81 self.thread = get_ident()
84 api = XivelyAPIClient(self.config.apikey)
85 feed = api.feeds.get(self.config.feedid)
87 # Process requests from main
89 # Pull data from queue (blocking)
90 states = self.queue.get()
91 print('thread - got %d states' % len(states))
93 # Make sure it's valid
97 # Split data and limit to 500 points per upload
98 upload = self.split(states, self.limit)
102 [ Datastream(id=chan, datapoints=upload[chan])
103 for chan in upload.keys() ]
107 print('thread - upload start')
109 print('thread - upload done')
110 except Exception as ex:
114 # Notify main that we're done
115 print('thread - exit')