]> Pileus Git - ~andy/csm213a-hw/commitdiff
Add test code and xively exmaple
authorAndy Spencer <andy753421@gmail.com>
Mon, 3 Feb 2014 04:17:38 +0000 (04:17 +0000)
committerAndy Spencer <andy753421@gmail.com>
Mon, 3 Feb 2014 04:17:38 +0000 (04:17 +0000)
.gitignore
vis/makefile
vis/test.py [new file with mode: 0644]

index 4b4699f7a3c2a1f00f6ce45e7e23664fe476d15b..536d98d01522b7016f8381a861715369ec2a9017 100644 (file)
@@ -7,4 +7,5 @@
 *.swp
 *~
 __pycache__
 *.swp
 *~
 __pycache__
+xively
 config.mk
 config.mk
index 26271df3f0ed32dd58ce08a3bc7da1e30d6987b6..a8665419dc54c66eed81c8d95e4331f32e4b0198 100644 (file)
@@ -1,5 +1,10 @@
-run:
+default: main
+
+main:
        python -u ./main.py
 
        python -u ./main.py
 
+test:
+       python -u ./test.py gtk serial xively
+
 clean:
        rm -rf *.pyc __pycache__
 clean:
        rm -rf *.pyc __pycache__
diff --git a/vis/test.py b/vis/test.py
new file mode 100644 (file)
index 0000000..c3e7ea2
--- /dev/null
@@ -0,0 +1,129 @@
+# Dependencies:
+#   Python   2.7
+#   PyGtk    2.24
+#   PySerial 2.6
+#   Requests 1.2.3
+
+import sys
+import time
+import datetime
+import random
+
+from gi.repository import Gtk
+from gi.repository import GLib
+
+from serial import Serial
+
+from xively import XivelyAPIClient
+from xively import Datastream
+from xively import Datapoint
+
+
+# GTK 3.0 Testing
+class TestGtk:
+       def __init__(self):
+               label  = Gtk.Label('Hello, world!')
+               window = Gtk.Window()
+               window.connect('destroy', self.quit)
+               window.add(label)
+               window.show_all()
+
+       def quit(self, win):
+               Gtk.main_quit()
+
+       def run(self):
+               Gtk.main()
+
+# Serial I/O Testing
+class TestSerial:
+       port = '/dev/ttyACM0'
+       baud = 9600
+
+       def __init__(self):
+               self.serial = Serial(self.port, baudrate = self.baud)
+
+       def write(self, line):
+               data = bytes(line+'\n', 'UTF-8')
+               self.serial.write(data)
+               time.sleep(0.1)
+
+       def send(self):
+               self.write('stop light')
+               self.write('stop touch')
+
+       def recv(self):
+               self.serial.flushInput()
+               for i in range(0,5):
+                       line = self.serial.readline()
+                       print('>> [' + line.decode().strip() + ']')
+
+# Xively Testing:
+#   https://xively.com/dev/docs/api/quick_reference/api_resources/
+#   https://xively.com/dev/docs/api/data/write/multiple_datapoints_to_multiple_data_streams/
+#   https://github.com/xively/xively-python/blob/master/docs/index.rst
+class TestXively:
+       # Setup
+       feed_id = 569539763
+       api_key = 'Py7atAQt2vAI5AFeWsUJMDqpIE1Dra5b0rPzlnr2ZDvDDoI5'
+
+       # Times
+       time0   = datetime.datetime(2014,2,3, 3,0,0)
+       time1   = datetime.datetime(2014,2,3, 3,1,0)
+       time2   = datetime.datetime(2014,2,3, 3,2,0)
+       time3   = datetime.datetime(2014,2,3, 3,3,0)
+       time4   = datetime.datetime(2014,2,3, 3,4,0)
+
+       # Constructor
+       def __init__(self):
+               self.api  = XivelyAPIClient(self.api_key)
+               self.feed = self.api.feeds.get(self.feed_id)
+
+       # Update the feed
+       def send(self):
+               acc = Datastream(id='acc')
+               acc.datapoints = [
+                   Datapoint(self.time0, [1, 1, 1]),
+                   Datapoint(self.time1, [2, 2, 2]),
+                   Datapoint(self.time2, [3, 3, 3]),
+                   Datapoint(self.time3, [4, 4, 4]),
+                   Datapoint(self.time4, [5, 5, 5]),
+               ]
+
+               mag = Datastream(id='mag')
+               mag.datapoints = [
+                   Datapoint(self.time0, [1, 1, 1]),
+                   Datapoint(self.time1, [2, 2, 2]),
+                   Datapoint(self.time2, [3, 3, 3]),
+                   Datapoint(self.time3, [4, 4, 4]),
+                   Datapoint(self.time4, [5, 5, 5]),
+               ]
+
+               self.feed.datastreams = [ acc, mag ]
+               self.feed.update()
+
+       # Read it back
+       def recv(self):
+               stream = self.feed.datastreams.get('acc')
+               points = stream.datapoints.history( \
+                       start=self.time0, end=self.time4, interval=0)
+
+               for p in points:
+                       time  = p._data['at']
+                       value = p._data['value']
+                       print('item: ' + str(time) + ' - ' + value)
+
+# Main
+for test in sys.argv[1:]:
+       if test == 'gtk':
+               gtk = TestGtk()
+               gtk.run()
+
+       if test == 'serial':
+               sio = TestSerial()
+               sio.send()
+               sio.recv()
+
+       if test == 'xively':
+               xiv = TestXively()
+               xiv.send()
+               xiv.recv()