# Dependencies: # Python 2.7 # PyGtk 2.24 # PySerial 2.6 # Requests 1.2.3 import sys import time import datetime import random from gi.repository import Gtk from gi.repository import GLib from serial import Serial from xively import XivelyAPIClient from xively import Datastream from xively import Datapoint # GTK 3.0 Testing class TestGtk: def __init__(self): label = Gtk.Label('Hello, world!') window = Gtk.Window() window.connect('destroy', self.quit) window.add(label) window.show_all() def quit(self, win): Gtk.main_quit() def run(self): Gtk.main() # Serial I/O Testing class TestSerial: port = '/dev/ttyACM0' baud = 9600 def __init__(self): self.serial = Serial(self.port, baudrate = self.baud) def write(self, line): data = bytes(line+'\n', 'UTF-8') self.serial.write(data) time.sleep(0.1) def send(self): self.write('stop light') self.write('stop touch') def recv(self): self.serial.flushInput() for i in range(0,5): line = self.serial.readline() print('>> [' + line.decode().strip() + ']') # Xively Testing: # https://xively.com/dev/docs/api/quick_reference/api_resources/ # https://xively.com/dev/docs/api/data/write/multiple_datapoints_to_multiple_data_streams/ # https://github.com/xively/xively-python/blob/master/docs/index.rst class TestXively: # Setup feed_id = 569539763 api_key = 'Py7atAQt2vAI5AFeWsUJMDqpIE1Dra5b0rPzlnr2ZDvDDoI5' # Times time0 = datetime.datetime(2014,2,3, 3,0,0) time1 = datetime.datetime(2014,2,3, 3,1,0) time2 = datetime.datetime(2014,2,3, 3,2,0) time3 = datetime.datetime(2014,2,3, 3,3,0) time4 = datetime.datetime(2014,2,3, 3,4,0) # Constructor def __init__(self): self.api = XivelyAPIClient(self.api_key) self.feed = self.api.feeds.get(self.feed_id) # Update the feed def send(self): acc = Datastream(id='acc') acc.datapoints = [ Datapoint(self.time0, [1, 1, 1]), Datapoint(self.time1, [2, 2, 2]), Datapoint(self.time2, [3, 3, 3]), Datapoint(self.time3, [4, 4, 4]), Datapoint(self.time4, [5, 5, 5]), ] mag = Datastream(id='mag') mag.datapoints = [ Datapoint(self.time0, [1, 1, 1]), Datapoint(self.time1, [2, 2, 2]), Datapoint(self.time2, [3, 3, 3]), Datapoint(self.time3, [4, 4, 4]), Datapoint(self.time4, [5, 5, 5]), ] self.feed.datastreams = [ acc, mag ] self.feed.update() # Read it back def recv(self): stream = self.feed.datastreams.get('acc') points = stream.datapoints.history( \ start=self.time0, end=self.time4, interval=0) for p in points: time = p._data['at'] value = p._data['value'] print('item: ' + str(time) + ' - ' + value) # Main for test in sys.argv[1:]: if test == 'gtk': gtk = TestGtk() gtk.run() if test == 'serial': sio = TestSerial() sio.send() sio.recv() if test == 'xively': xiv = TestXively() xiv.send() xiv.recv()