]> Pileus Git - ~andy/csm213a-hw/blob - vis/test.py
Fix whitespace error
[~andy/csm213a-hw] / vis / test.py
1 # Dependencies:
2 #   Python   2.7
3 #   PyGtk    2.24
4 #   PySerial 2.6
5 #   Requests 1.2.3
6
7 import sys
8 import time
9 import datetime
10 import random
11
12 if sys.version_info<(3,0,0):
13         import pygtk as PyGtk
14         import gtk   as Gtk
15 else:
16         from gi.repository import Gtk
17         from gi.repository import GLib
18
19 from serial import Serial
20
21 from xively import XivelyAPIClient
22 from xively import Datastream
23 from xively import Datapoint
24
25
26 # GTK 3.0 Testing
27 class TestGtk:
28         def __init__(self):
29                 label  = Gtk.Label('Hello, world!')
30                 window = Gtk.Window()
31                 window.connect('destroy', self.quit)
32                 window.add(label)
33                 window.show_all()
34
35         def quit(self, win):
36                 Gtk.main_quit()
37
38         def run(self):
39                 Gtk.main()
40
41 # Serial I/O Testing
42 class TestSerial:
43         port = '/dev/ttyACM0'
44         baud = 9600
45
46         def __init__(self):
47                 self.serial = Serial(self.port, baudrate = self.baud)
48
49         def write(self, line):
50                 data = bytes(line+'\n', 'UTF-8')
51                 self.serial.write(data)
52                 time.sleep(0.1)
53
54         def send(self):
55                 self.write('stop light')
56                 self.write('stop touch')
57
58         def recv(self):
59                 self.serial.flushInput()
60                 for i in range(0,5):
61                         line = self.serial.readline()
62                         print('>> [' + line.decode().strip() + ']')
63
64 # Xively Testing:
65 #   https://xively.com/dev/docs/api/quick_reference/api_resources/
66 #   https://xively.com/dev/docs/api/data/write/multiple_datapoints_to_multiple_data_streams/
67 #   https://github.com/xively/xively-python/blob/master/docs/index.rst
68 class TestXively:
69         # Setup
70         feed_id = 569539763
71         api_key = 'Py7atAQt2vAI5AFeWsUJMDqpIE1Dra5b0rPzlnr2ZDvDDoI5'
72
73         # Times
74         time0   = datetime.datetime(2014,2,3, 3,0,0)
75         time1   = datetime.datetime(2014,2,3, 3,1,0)
76         time2   = datetime.datetime(2014,2,3, 3,2,0)
77         time3   = datetime.datetime(2014,2,3, 3,3,0)
78         time4   = datetime.datetime(2014,2,3, 3,4,0)
79
80         # Constructor
81         def __init__(self):
82                 self.api  = XivelyAPIClient(self.api_key)
83                 self.feed = self.api.feeds.get(self.feed_id)
84
85         # Update the feed
86         def send(self):
87                 acc = Datastream(id='acc')
88                 acc.datapoints = [
89                         Datapoint(self.time0, [1, 1, 1]),
90                         Datapoint(self.time1, [2, 2, 2]),
91                         Datapoint(self.time2, [3, 3, 3]),
92                         Datapoint(self.time3, [4, 4, 4]),
93                         Datapoint(self.time4, [5, 5, 5]),
94                 ]
95
96                 mag = Datastream(id='mag')
97                 mag.datapoints = [
98                         Datapoint(self.time0, [1, 1, 1]),
99                         Datapoint(self.time1, [2, 2, 2]),
100                         Datapoint(self.time2, [3, 3, 3]),
101                         Datapoint(self.time3, [4, 4, 4]),
102                         Datapoint(self.time4, [5, 5, 5]),
103                 ]
104
105                 self.feed.datastreams = [ acc, mag ]
106                 self.feed.update()
107
108         # Read it back
109         def recv(self):
110                 stream = self.feed.datastreams.get('acc')
111                 points = stream.datapoints.history( \
112                         start=self.time0, end=self.time4, interval=0)
113
114                 for p in points:
115                         time  = p._data['at']
116                         value = p._data['value']
117                         print('item: ' + str(time) + ' - ' + value)
118
119 # Main
120 for test in sys.argv[1:]:
121         if test == 'gtk':
122                 gtk = TestGtk()
123                 gtk.run()
124
125         if test == 'serial':
126                 sio = TestSerial()
127                 sio.send()
128                 sio.recv()
129
130         if test == 'xively':
131                 xiv = TestXively()
132                 xiv.send()
133                 xiv.recv()