]> Pileus Git - ~andy/csm213a-hw/blob - vis/test.py
Split Xively streams and debug upload
[~andy/csm213a-hw] / vis / test.py
1 # Dependencies:
2 #   Python   2.7
3 #   PyGtk    2.24
4 #   PySerial 2.6
5 #   Requests 1.2.3
6
7 import sys
8 import time
9 import datetime
10 import random
11
12 if sys.version_info<(3,0,0):
13         import pygtk as PyGtk
14         import gtk   as Gtk
15 else:
16         from gi.repository import Gtk
17         from gi.repository import GLib
18
19 from serial import Serial
20
21 from xively import XivelyAPIClient
22 from xively import Datastream
23 from xively import Datapoint
24
25
26 # GTK 3.0 Testing
27 class TestGtk:
28         def __init__(self):
29                 label  = Gtk.Label('Hello, world!')
30                 window = Gtk.Window()
31                 window.connect('destroy', self.quit)
32                 window.add(label)
33                 window.show_all()
34
35         def quit(self, win):
36                 Gtk.main_quit()
37
38         def run(self):
39                 Gtk.main()
40
41 # Serial I/O Testing
42 class TestSerial:
43         port = '/dev/ttyACM0'
44         baud = 9600
45
46         def __init__(self):
47                 self.serial = Serial(self.port, baudrate = self.baud)
48
49         def write(self, line):
50                 data = bytes(line+'\n', 'UTF-8')
51                 self.serial.write(data)
52                 time.sleep(0.1)
53
54         def send(self):
55                 self.write('stop light')
56                 self.write('stop touch')
57
58         def recv(self):
59                 self.serial.flushInput()
60                 for i in range(0,5):
61                         line = self.serial.readline()
62                         print('>> [' + line.decode().strip() + ']')
63
64 # Xively Testing:
65 #   https://xively.com/dev/docs/api/quick_reference/api_resources/
66 #   https://xively.com/dev/docs/api/data/write/multiple_datapoints_to_multiple_data_streams/
67 #   https://github.com/xively/xively-python/blob/master/docs/index.rst
68 class TestXively:
69         # Setup
70         feed_id = 569539763
71         api_key = 'Py7atAQt2vAI5AFeWsUJMDqpIE1Dra5b0rPzlnr2ZDvDDoI5'
72
73         # Times
74         now     = datetime.datetime.utcnow();
75         time0   = now-datetime.timedelta(0,0*60); # datetime.datetime(2014,2,3, 3,0,0)
76         time1   = now-datetime.timedelta(0,1*60); # datetime.datetime(2014,2,3, 3,1,0)
77         time2   = now-datetime.timedelta(0,2*60); # datetime.datetime(2014,2,3, 3,2,0)
78         time3   = now-datetime.timedelta(0,3*60); # datetime.datetime(2014,2,3, 3,3,0)
79         time4   = now-datetime.timedelta(0,4*60); # datetime.datetime(2014,2,3, 3,4,0)
80         time5   = now-datetime.timedelta(0,5*60); # datetime.datetime(2014,2,3, 3,4,0)
81         time6   = now-datetime.timedelta(0,6*60); # datetime.datetime(2014,2,3, 3,4,0)
82         time7   = now-datetime.timedelta(0,7*60); # datetime.datetime(2014,2,3, 3,4,0)
83         time8   = now-datetime.timedelta(0,8*60); # datetime.datetime(2014,2,3, 3,4,0)
84         time9   = now-datetime.timedelta(0,9*60); # datetime.datetime(2014,2,3, 3,4,0)
85
86         # Constructor
87         def __init__(self):
88                 self.api  = XivelyAPIClient(self.api_key)
89                 self.feed = self.api.feeds.get(self.feed_id)
90
91         # Update the feed
92         def send(self):
93                 acc = Datastream(id='test_acc')
94                 acc.datapoints = [
95                         Datapoint(self.time0, [00, 1, 1]),
96                         Datapoint(self.time1, [10, 2, 2]),
97                         Datapoint(self.time2, [20, 3, 3]),
98                         Datapoint(self.time3, [30, 4, 4]),
99                         Datapoint(self.time4, [40, 5, 5]),
100                         Datapoint(self.time5, [50, 5, 5]),
101                         Datapoint(self.time6, [60, 5, 5]),
102                         Datapoint(self.time7, [70, 5, 5]),
103                         Datapoint(self.time8, [80, 5, 5]),
104                         Datapoint(self.time9, [90, 5, 5]),
105                 ]
106
107                 mag = Datastream(id='test_mag')
108                 mag.datapoints = [
109                         Datapoint(self.time0, 00),
110                         Datapoint(self.time1, 10),
111                         Datapoint(self.time2, 20),
112                         Datapoint(self.time3, 30),
113                         Datapoint(self.time4, 40),
114                         Datapoint(self.time5, 50),
115                         Datapoint(self.time6, 60),
116                         Datapoint(self.time7, 70),
117                         Datapoint(self.time8, 80),
118                         Datapoint(self.time9, 90),
119                 ]
120
121                 self.feed.datastreams = [ acc, mag ]
122                 self.feed.update()
123
124         # Read it back
125         def recv(self):
126                 stream = self.feed.datastreams.get('test_acc')
127                 points = stream.datapoints.history( \
128                         start=self.time0, end=self.time4, interval=0)
129
130                 for p in points:
131                         time  = p._data['at']
132                         value = p._data['value']
133                         print('item: ' + str(time) + ' - ' + value)
134
135 # Main
136 for test in sys.argv[1:]:
137         if test == 'gtk':
138                 gtk = TestGtk()
139                 gtk.run()
140
141         if test == 'serial':
142                 sio = TestSerial()
143                 sio.send()
144                 sio.recv()
145
146         if test == 'xively':
147                 xiv = TestXively()
148                 xiv.send()
149                 xiv.recv()