]> Pileus Git - ~andy/csm213a-hw/blob - vis/logger.py
Convert logger to use threads
[~andy/csm213a-hw] / vis / logger.py
1 from datetime import datetime
2 from Queue    import Queue
3 from thread   import get_ident, start_new_thread
4
5 from xively import XivelyAPIClient
6 from xively import Datastream
7 from xively import Datapoint
8
9
10 class Logger:
11         # Constructor
12         def __init__(self, config):
13                 self.config = config
14                 self.api    = None
15                 self.feed   = None
16                 self.last   = 0
17                 self.limit  = 500
18                 self.states = []
19                 self.queue  = Queue()
20                 self.thread = None
21                 self.error  = ''
22
23         # Methods
24         def connect(self):
25                 if not self.running():
26                         self.last = datetime.utcnow()
27                         start_new_thread(self.task, ())
28
29         def disconnect(self):
30                 if self.running():
31                         self.thread = None
32                         self.queue.put([])
33
34         def running(self):
35                 return bool(self.thread)
36
37         def append(self, state):
38                 if not self.running():
39                         return
40                 self.states.append(state)
41                 delta = datetime.utcnow() - self.last
42                 if delta.total_seconds() > self.config.maxrate:
43                         self.queue.put(self.states)
44                         self.last   = datetime.utcnow()
45                         self.states = []
46
47         # Private methods
48         def split(self, items, limit):
49                 split   = {}
50                 count   = 0
51                 sensors = {'acc': ['x','y','z'],
52                            'mag': ['x','y','z'],
53                            'tch': ['p','d'],
54                            'lgt': [''],
55                            'a2d': ['0','1','2','3','4','5']}
56                 for state in items:
57                         for sns in sensors:
58                                 data = getattr(state, sns)
59                                 for i in range(len(data)):
60                                         if data[i] == None:
61                                                 continue
62                                         if count >= limit:
63                                                 print('upload: cropping upload')
64                                                 return split
65                                         chan  = sns + sensors[sns][i]
66                                         if not split.has_key(chan):
67                                                 split[chan] = []
68                                         point = Datapoint(state.time, data[i])
69                                         split[chan].append(point)
70                                         count += 1
71                 return split
72
73         def task(self):
74                 print('thread - start')
75                 self.thread = get_ident()
76
77                 # Setup connection
78                 api  = XivelyAPIClient(self.config.apikey)
79                 feed = api.feeds.get(self.config.feedid)
80
81                 # Process requests from main
82                 while self.running():
83                         # Pull data from queue (blocking)
84                         states = self.queue.get()
85                         print('thread - got %d states' % len(states))
86
87                         # Make sure it's valid
88                         if len(states) == 0:
89                                 continue
90
91                         # Split data and limit to 500 points per upload
92                         upload = self.split(states, self.limit)
93
94                         # Create datastreams
95                         feed.datastreams = \
96                                 [ Datastream(id=chan, datapoints=upload[chan])
97                                   for chan in upload.keys() ]
98
99                         # Upload it
100                         try:
101                                 print('thread - upload start')
102                                 feed.update()
103                                 print('thread - upload done')
104                         except Exception as ex:
105                                 self.error = str(ex)
106                                 break
107
108                 # Notify main that we're done
109                 print('thread - exit')
110                 self.thread = None