]> Pileus Git - ~andy/csm213a-hw/blob - vis/test.py
Add test code and xively exmaple
[~andy/csm213a-hw] / vis / test.py
1 # Dependencies:
2 #   Python   2.7
3 #   PyGtk    2.24
4 #   PySerial 2.6
5 #   Requests 1.2.3
6
7 import sys
8 import time
9 import datetime
10 import random
11
12 from gi.repository import Gtk
13 from gi.repository import GLib
14
15 from serial import Serial
16
17 from xively import XivelyAPIClient
18 from xively import Datastream
19 from xively import Datapoint
20
21
22 # GTK 3.0 Testing
23 class TestGtk:
24         def __init__(self):
25                 label  = Gtk.Label('Hello, world!')
26                 window = Gtk.Window()
27                 window.connect('destroy', self.quit)
28                 window.add(label)
29                 window.show_all()
30
31         def quit(self, win):
32                 Gtk.main_quit()
33
34         def run(self):
35                 Gtk.main()
36
37 # Serial I/O Testing
38 class TestSerial:
39         port = '/dev/ttyACM0'
40         baud = 9600
41
42         def __init__(self):
43                 self.serial = Serial(self.port, baudrate = self.baud)
44
45         def write(self, line):
46                 data = bytes(line+'\n', 'UTF-8')
47                 self.serial.write(data)
48                 time.sleep(0.1)
49
50         def send(self):
51                 self.write('stop light')
52                 self.write('stop touch')
53
54         def recv(self):
55                 self.serial.flushInput()
56                 for i in range(0,5):
57                         line = self.serial.readline()
58                         print('>> [' + line.decode().strip() + ']')
59
60 # Xively Testing:
61 #   https://xively.com/dev/docs/api/quick_reference/api_resources/
62 #   https://xively.com/dev/docs/api/data/write/multiple_datapoints_to_multiple_data_streams/
63 #   https://github.com/xively/xively-python/blob/master/docs/index.rst
64 class TestXively:
65         # Setup
66         feed_id = 569539763
67         api_key = 'Py7atAQt2vAI5AFeWsUJMDqpIE1Dra5b0rPzlnr2ZDvDDoI5'
68
69         # Times
70         time0   = datetime.datetime(2014,2,3, 3,0,0)
71         time1   = datetime.datetime(2014,2,3, 3,1,0)
72         time2   = datetime.datetime(2014,2,3, 3,2,0)
73         time3   = datetime.datetime(2014,2,3, 3,3,0)
74         time4   = datetime.datetime(2014,2,3, 3,4,0)
75
76         # Constructor
77         def __init__(self):
78                 self.api  = XivelyAPIClient(self.api_key)
79                 self.feed = self.api.feeds.get(self.feed_id)
80
81         # Update the feed
82         def send(self):
83                 acc = Datastream(id='acc')
84                 acc.datapoints = [
85                     Datapoint(self.time0, [1, 1, 1]),
86                     Datapoint(self.time1, [2, 2, 2]),
87                     Datapoint(self.time2, [3, 3, 3]),
88                     Datapoint(self.time3, [4, 4, 4]),
89                     Datapoint(self.time4, [5, 5, 5]),
90                 ]
91
92                 mag = Datastream(id='mag')
93                 mag.datapoints = [
94                     Datapoint(self.time0, [1, 1, 1]),
95                     Datapoint(self.time1, [2, 2, 2]),
96                     Datapoint(self.time2, [3, 3, 3]),
97                     Datapoint(self.time3, [4, 4, 4]),
98                     Datapoint(self.time4, [5, 5, 5]),
99                 ]
100
101                 self.feed.datastreams = [ acc, mag ]
102                 self.feed.update()
103
104         # Read it back
105         def recv(self):
106                 stream = self.feed.datastreams.get('acc')
107                 points = stream.datapoints.history( \
108                         start=self.time0, end=self.time4, interval=0)
109
110                 for p in points:
111                         time  = p._data['at']
112                         value = p._data['value']
113                         print('item: ' + str(time) + ' - ' + value)
114
115 # Main
116 for test in sys.argv[1:]:
117         if test == 'gtk':
118                 gtk = TestGtk()
119                 gtk.run()
120
121         if test == 'serial':
122                 sio = TestSerial()
123                 sio.send()
124                 sio.recv()
125
126         if test == 'xively':
127                 xiv = TestXively()
128                 xiv.send()
129                 xiv.recv()