From: Andy Spencer Date: Mon, 3 Feb 2014 04:17:38 +0000 (+0000) Subject: Add test code and xively exmaple X-Git-Url: http://pileus.org/git/?p=~andy%2Fcsm213a-hw;a=commitdiff_plain;h=4d876a1f029cf2455304f0281c273eeb5df327ac Add test code and xively exmaple --- diff --git a/.gitignore b/.gitignore index 4b4699f..536d98d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ *.swp *~ __pycache__ +xively config.mk diff --git a/vis/makefile b/vis/makefile index 26271df..a866541 100644 --- a/vis/makefile +++ b/vis/makefile @@ -1,5 +1,10 @@ -run: +default: main + +main: python -u ./main.py +test: + python -u ./test.py gtk serial xively + clean: rm -rf *.pyc __pycache__ diff --git a/vis/test.py b/vis/test.py new file mode 100644 index 0000000..c3e7ea2 --- /dev/null +++ b/vis/test.py @@ -0,0 +1,129 @@ +# Dependencies: +# Python 2.7 +# PyGtk 2.24 +# PySerial 2.6 +# Requests 1.2.3 + +import sys +import time +import datetime +import random + +from gi.repository import Gtk +from gi.repository import GLib + +from serial import Serial + +from xively import XivelyAPIClient +from xively import Datastream +from xively import Datapoint + + +# GTK 3.0 Testing +class TestGtk: + def __init__(self): + label = Gtk.Label('Hello, world!') + window = Gtk.Window() + window.connect('destroy', self.quit) + window.add(label) + window.show_all() + + def quit(self, win): + Gtk.main_quit() + + def run(self): + Gtk.main() + +# Serial I/O Testing +class TestSerial: + port = '/dev/ttyACM0' + baud = 9600 + + def __init__(self): + self.serial = Serial(self.port, baudrate = self.baud) + + def write(self, line): + data = bytes(line+'\n', 'UTF-8') + self.serial.write(data) + time.sleep(0.1) + + def send(self): + self.write('stop light') + self.write('stop touch') + + def recv(self): + self.serial.flushInput() + for i in range(0,5): + line = self.serial.readline() + print('>> [' + line.decode().strip() + ']') + +# Xively Testing: +# https://xively.com/dev/docs/api/quick_reference/api_resources/ +# https://xively.com/dev/docs/api/data/write/multiple_datapoints_to_multiple_data_streams/ +# https://github.com/xively/xively-python/blob/master/docs/index.rst +class TestXively: + # Setup + feed_id = 569539763 + api_key = 'Py7atAQt2vAI5AFeWsUJMDqpIE1Dra5b0rPzlnr2ZDvDDoI5' + + # Times + time0 = datetime.datetime(2014,2,3, 3,0,0) + time1 = datetime.datetime(2014,2,3, 3,1,0) + time2 = datetime.datetime(2014,2,3, 3,2,0) + time3 = datetime.datetime(2014,2,3, 3,3,0) + time4 = datetime.datetime(2014,2,3, 3,4,0) + + # Constructor + def __init__(self): + self.api = XivelyAPIClient(self.api_key) + self.feed = self.api.feeds.get(self.feed_id) + + # Update the feed + def send(self): + acc = Datastream(id='acc') + acc.datapoints = [ + Datapoint(self.time0, [1, 1, 1]), + Datapoint(self.time1, [2, 2, 2]), + Datapoint(self.time2, [3, 3, 3]), + Datapoint(self.time3, [4, 4, 4]), + Datapoint(self.time4, [5, 5, 5]), + ] + + mag = Datastream(id='mag') + mag.datapoints = [ + Datapoint(self.time0, [1, 1, 1]), + Datapoint(self.time1, [2, 2, 2]), + Datapoint(self.time2, [3, 3, 3]), + Datapoint(self.time3, [4, 4, 4]), + Datapoint(self.time4, [5, 5, 5]), + ] + + self.feed.datastreams = [ acc, mag ] + self.feed.update() + + # Read it back + def recv(self): + stream = self.feed.datastreams.get('acc') + points = stream.datapoints.history( \ + start=self.time0, end=self.time4, interval=0) + + for p in points: + time = p._data['at'] + value = p._data['value'] + print('item: ' + str(time) + ' - ' + value) + +# Main +for test in sys.argv[1:]: + if test == 'gtk': + gtk = TestGtk() + gtk.run() + + if test == 'serial': + sio = TestSerial() + sio.send() + sio.recv() + + if test == 'xively': + xiv = TestXively() + xiv.send() + xiv.recv()