Select Git revision
host.py 4.43 KiB
#!/usr/bin/env python
from numpy import cumsum,asarray,arange
from math import *
import serial, time
import struct
import sys
import matplotlib as plt
def read_int32(bytes,signed=False):
try:
n = struct.unpack("<i",bytearray(bytes))[0] if signed else struct.unpack("<I",bytearray(bytes))[0]
return n
except:
print "not properly formatted bytes: ",list(bytes)
return None
def read_int16(bytes,signed=False):
try:
n = struct.unpack("<h",bytearray(bytes))[0] if signed else struct.unpack("<H",bytearray(bytes))[0]
return n
except:
print "not properly formatted bytes: ",list(bytes)
return None
def encode_int32(n,signed=False):
b = struct.pack("<i",n) if signed else struct.pack("<I",n)
return list(b)
def encode_int16(n,signed=False):
b = struct.pack("<h",n) if signed else struct.pack("<H",n)
return list(b)
def encode_char(n,signed=False):
b = struct.pack("<b",n) if signed else struct.pack("<B",n)
return list(b)
def _readline(ser=None):
if ser is None:
time.sleep(.05)
return bytearray([200,200,200,200,200]) + bytearray([0 for i in range(44)]) + bytearray([201,201,201,201,201])
else:
line = bytearray()
while True:
c = ser.read(1)
if c:
line += c
if line[-5:] == bytearray([201,201,201,201,201]):
if len(line) in [11,24]:
break
else:
print "improper line length: ",len(line)
else:
break
return line
def write(cmd, ser=None):
if ser is None:
return 0
else:
#print cmd
cmd += bytearray([sum(cmd)%256]) #add simple checksum
c = bytearray([200,200,200,200,200]) + cmd + bytearray([201,201,201,201,201])
#print 'sending: ',len(c)
ser.write(c)
#time.sleep(.01)
rtrn = _readline(ser=ser)
if len(rtrn)<6 or rtrn[5] != 6:
print "didn't get checksum acknowledgement!"
print 'return: ',list(rtrn)
sys.exit(0)
#else:
# print "command acknowledged!", list(rtrn)
def request_state_command():
return bytearray([110])
def request_state(ser=None):
write(request_state_command(),ser=ser)
def read_state(ser=None):
request_state(ser=ser)
line = _readline(ser=ser)
#print "received: ",list(line)
if len(line)>4 and line[0]==200 and line[1]==200 and line[2]==200 and line[3]==200 and line[4]==200:
line = line.strip('\n')[5:] #ha, make sure we strip outside the framing, not the inside!!!
#items = [ read_int16(map(int,line[x:x+2]),signed=False) for x in xrange(0,len(line)-5,2) ]
#items = [ read_int16(line[x:x+2],signed=False) for x in xrange(0,len(line)-5,2) ]
#items = [ read_int32(line[x:x+4],signed=True) for x in xrange(0,6*4,4) ] + \
# [ read_int16(line[x:x+2],signed=False) for x in xrange(6*4,len(line)-5,2) ]
items = [ read_int16(line[x:x+2],signed=False) for x in xrange(0,7*2,2) ]
print 'items: ',items
if None not in items:
return items
else:
print 'garbage: ', list(line)
else:
print 'didnt get framing: ',read_int16(line.strip('\n'),signed=True)
sys.exit(0)
def pwm_command(cca,ccb,ccc,ccd):
#ccx are 16 bit uints, we break them into bytes and send
return bytearray([100]\
+ encode_int16(cca,signed=False)\
+ encode_int16(ccb,signed=False)\
+ encode_int16(ccc,signed=False)\
+ encode_int16(ccd,signed=False))
def read_angle_command():
#ccx are 16 bit uints, we break them into bytes and send
return bytearray([101])
def main():
try:
ser = serial.Serial(port='/dev/tty.usbserial-FT9L3KWR',baudrate=230400,timeout=1.)
ser.isOpen()
except(OSError):
#no serial port
print "Couldn't find the serial port, entering debug mode"
ser = None
print "Starting up..."
N = 50
dt = .03
amp = 2**13-1
times = [dt*i for i in range(N)]#[0, 1., 2., 3.]
#f = lambda i: int(amp/2*(1-cos(2*pi*i/(N-1))))
pwms = [(i,2*i,2*i,2*i) for i,t in enumerate(times)]
data = []
#assert (len(times)==len(pwms))
t0 = time.time()
current_frame = 0
current_time = lambda: time.time() - t0
try:
while( current_time() <= times[-1] ):
if current_time() >= times[current_frame]:
print 'writing frame %d'%current_frame
write( pwm_command( *pwms[current_frame] ) , ser=ser)
new_data = read_state(ser=ser)
write( read_angle_command() , ser=ser)
data.append(new_data)
current_frame += 1
except(KeyboardInterrupt):
#write( pwm_command(0,0,0,0,0,0) , ser=ser)
print 'quitting'
if ser is not None:
ser.close()
sys.exit(0)
#print "Finished sending keyframes"
#write( pwm_command(0,0,0,0,0,0) , ser=ser)
#with open('sewing_data.csv', 'w') as f:
# for l in data:
# f.write( ','.join(map(str,l))+'\n' )
if __name__ == '__main__':
main()