# Dependencies: # Python 2.7 # PyGtk 2.24 # PySerial 2.6 # Requests 1.2.3 import sys import time import datetime import random if sys.version_info<(3,0,0): import pygtk as PyGtk import gtk as Gtk else: from gi.repository import Gtk from gi.repository import GLib from serial import Serial from xively import XivelyAPIClient from xively import Datastream from xively import Datapoint # GTK 3.0 Testing class TestGtk: def __init__(self): label = Gtk.Label('Hello, world!') window = Gtk.Window() window.connect('destroy', self.quit) window.add(label) window.show_all() def quit(self, win): Gtk.main_quit() def run(self): Gtk.main() # Serial I/O Testing class TestSerial: port = '/dev/ttyACM0' baud = 9600 def __init__(self): self.serial = Serial(self.port, baudrate = self.baud) def write(self, line): data = bytes(line+'\n', 'UTF-8') self.serial.write(data) time.sleep(0.1) def send(self): self.write('stop light') self.write('stop touch') def recv(self): self.serial.flushInput() for i in range(0,5): line = self.serial.readline() print('>> [' + line.decode().strip() + ']') # Xively Testing: # https://xively.com/dev/docs/api/quick_reference/api_resources/ # https://xively.com/dev/docs/api/data/write/multiple_datapoints_to_multiple_data_streams/ # https://github.com/xively/xively-python/blob/master/docs/index.rst class TestXively: # Setup feed_id = 569539763 api_key = 'Py7atAQt2vAI5AFeWsUJMDqpIE1Dra5b0rPzlnr2ZDvDDoI5' # Times now = datetime.datetime.utcnow(); time0 = now-datetime.timedelta(0,0*60); # datetime.datetime(2014,2,3, 3,0,0) time1 = now-datetime.timedelta(0,1*60); # datetime.datetime(2014,2,3, 3,1,0) time2 = now-datetime.timedelta(0,2*60); # datetime.datetime(2014,2,3, 3,2,0) time3 = now-datetime.timedelta(0,3*60); # datetime.datetime(2014,2,3, 3,3,0) time4 = now-datetime.timedelta(0,4*60); # datetime.datetime(2014,2,3, 3,4,0) time5 = now-datetime.timedelta(0,5*60); # datetime.datetime(2014,2,3, 3,4,0) time6 = now-datetime.timedelta(0,6*60); # datetime.datetime(2014,2,3, 3,4,0) time7 = now-datetime.timedelta(0,7*60); # datetime.datetime(2014,2,3, 3,4,0) time8 = now-datetime.timedelta(0,8*60); # datetime.datetime(2014,2,3, 3,4,0) time9 = now-datetime.timedelta(0,9*60); # datetime.datetime(2014,2,3, 3,4,0) # Constructor def __init__(self): self.api = XivelyAPIClient(self.api_key) self.feed = self.api.feeds.get(self.feed_id) # Update the feed def send(self): acc = Datastream(id='test_acc') acc.datapoints = [ Datapoint(self.time0, [00, 1, 1]), Datapoint(self.time1, [10, 2, 2]), Datapoint(self.time2, [20, 3, 3]), Datapoint(self.time3, [30, 4, 4]), Datapoint(self.time4, [40, 5, 5]), Datapoint(self.time5, [50, 5, 5]), Datapoint(self.time6, [60, 5, 5]), Datapoint(self.time7, [70, 5, 5]), Datapoint(self.time8, [80, 5, 5]), Datapoint(self.time9, [90, 5, 5]), ] mag = Datastream(id='test_mag') mag.datapoints = [ Datapoint(self.time0, 00), Datapoint(self.time1, 10), Datapoint(self.time2, 20), Datapoint(self.time3, 30), Datapoint(self.time4, 40), Datapoint(self.time5, 50), Datapoint(self.time6, 60), Datapoint(self.time7, 70), Datapoint(self.time8, 80), Datapoint(self.time9, 90), ] self.feed.datastreams = [ acc, mag ] self.feed.update() # Read it back def recv(self): stream = self.feed.datastreams.get('test_acc') points = stream.datapoints.history( \ start=self.time0, end=self.time4, interval=0) for p in points: time = p._data['at'] value = p._data['value'] print('item: ' + str(time) + ' - ' + value) # Main for test in sys.argv[1:]: if test == 'gtk': gtk = TestGtk() gtk.run() if test == 'serial': sio = TestSerial() sio.send() sio.recv() if test == 'xively': xiv = TestXively() xiv.send() xiv.recv()