]> Pileus Git - ~andy/csm213a-hw/blobdiff - vis/test.py
Add test code and xively exmaple
[~andy/csm213a-hw] / vis / test.py
diff --git a/vis/test.py b/vis/test.py
new file mode 100644 (file)
index 0000000..c3e7ea2
--- /dev/null
@@ -0,0 +1,129 @@
+# Dependencies:
+#   Python   2.7
+#   PyGtk    2.24
+#   PySerial 2.6
+#   Requests 1.2.3
+
+import sys
+import time
+import datetime
+import random
+
+from gi.repository import Gtk
+from gi.repository import GLib
+
+from serial import Serial
+
+from xively import XivelyAPIClient
+from xively import Datastream
+from xively import Datapoint
+
+
+# GTK 3.0 Testing
+class TestGtk:
+       def __init__(self):
+               label  = Gtk.Label('Hello, world!')
+               window = Gtk.Window()
+               window.connect('destroy', self.quit)
+               window.add(label)
+               window.show_all()
+
+       def quit(self, win):
+               Gtk.main_quit()
+
+       def run(self):
+               Gtk.main()
+
+# Serial I/O Testing
+class TestSerial:
+       port = '/dev/ttyACM0'
+       baud = 9600
+
+       def __init__(self):
+               self.serial = Serial(self.port, baudrate = self.baud)
+
+       def write(self, line):
+               data = bytes(line+'\n', 'UTF-8')
+               self.serial.write(data)
+               time.sleep(0.1)
+
+       def send(self):
+               self.write('stop light')
+               self.write('stop touch')
+
+       def recv(self):
+               self.serial.flushInput()
+               for i in range(0,5):
+                       line = self.serial.readline()
+                       print('>> [' + line.decode().strip() + ']')
+
+# Xively Testing:
+#   https://xively.com/dev/docs/api/quick_reference/api_resources/
+#   https://xively.com/dev/docs/api/data/write/multiple_datapoints_to_multiple_data_streams/
+#   https://github.com/xively/xively-python/blob/master/docs/index.rst
+class TestXively:
+       # Setup
+       feed_id = 569539763
+       api_key = 'Py7atAQt2vAI5AFeWsUJMDqpIE1Dra5b0rPzlnr2ZDvDDoI5'
+
+       # Times
+       time0   = datetime.datetime(2014,2,3, 3,0,0)
+       time1   = datetime.datetime(2014,2,3, 3,1,0)
+       time2   = datetime.datetime(2014,2,3, 3,2,0)
+       time3   = datetime.datetime(2014,2,3, 3,3,0)
+       time4   = datetime.datetime(2014,2,3, 3,4,0)
+
+       # Constructor
+       def __init__(self):
+               self.api  = XivelyAPIClient(self.api_key)
+               self.feed = self.api.feeds.get(self.feed_id)
+
+       # Update the feed
+       def send(self):
+               acc = Datastream(id='acc')
+               acc.datapoints = [
+                   Datapoint(self.time0, [1, 1, 1]),
+                   Datapoint(self.time1, [2, 2, 2]),
+                   Datapoint(self.time2, [3, 3, 3]),
+                   Datapoint(self.time3, [4, 4, 4]),
+                   Datapoint(self.time4, [5, 5, 5]),
+               ]
+
+               mag = Datastream(id='mag')
+               mag.datapoints = [
+                   Datapoint(self.time0, [1, 1, 1]),
+                   Datapoint(self.time1, [2, 2, 2]),
+                   Datapoint(self.time2, [3, 3, 3]),
+                   Datapoint(self.time3, [4, 4, 4]),
+                   Datapoint(self.time4, [5, 5, 5]),
+               ]
+
+               self.feed.datastreams = [ acc, mag ]
+               self.feed.update()
+
+       # Read it back
+       def recv(self):
+               stream = self.feed.datastreams.get('acc')
+               points = stream.datapoints.history( \
+                       start=self.time0, end=self.time4, interval=0)
+
+               for p in points:
+                       time  = p._data['at']
+                       value = p._data['value']
+                       print('item: ' + str(time) + ' - ' + value)
+
+# Main
+for test in sys.argv[1:]:
+       if test == 'gtk':
+               gtk = TestGtk()
+               gtk.run()
+
+       if test == 'serial':
+               sio = TestSerial()
+               sio.send()
+               sio.recv()
+
+       if test == 'xively':
+               xiv = TestXively()
+               xiv.send()
+               xiv.recv()