Skip to content
Snippets Groups Projects
Select Git revision
  • 39c28c76aaf635bf1b6a99aecfcc65281a51969f
  • master default protected
2 results

offset.mp4

Blame
  • host.py 4.43 KiB
    #!/usr/bin/env python
    from numpy import cumsum,asarray,arange
    from math import *
    import serial, time
    import struct
    import sys
    import matplotlib as plt
    
    def read_int32(bytes,signed=False):
    	try:
    		n = struct.unpack("<i",bytearray(bytes))[0] if signed else struct.unpack("<I",bytearray(bytes))[0]
    		return n
    	except:
    		print "not properly formatted bytes: ",list(bytes)
    		return None
    def read_int16(bytes,signed=False):
    	try:
    		n = struct.unpack("<h",bytearray(bytes))[0] if signed else struct.unpack("<H",bytearray(bytes))[0]
    		return n
    	except:
    		print "not properly formatted bytes: ",list(bytes)
    		return None
    def encode_int32(n,signed=False):
    	b = struct.pack("<i",n) if signed else struct.pack("<I",n)
    	return list(b)
    def encode_int16(n,signed=False):
    	b = struct.pack("<h",n) if signed else struct.pack("<H",n)
    	return list(b)
    def encode_char(n,signed=False):
    	b = struct.pack("<b",n) if signed else struct.pack("<B",n)
    	return list(b)	
    
    def _readline(ser=None):
    	if ser is None:
    		time.sleep(.05)
    		return bytearray([200,200,200,200,200]) + bytearray([0 for i in range(44)]) + bytearray([201,201,201,201,201])
    	else:
    		line = bytearray()
    		while True:
    			c = ser.read(1)
    			if c:
    				line += c
    				if line[-5:] == bytearray([201,201,201,201,201]):
    					if len(line) in [11,24]:
    						break
    					else:
    						print "improper line length: ",len(line)
    			else:
    				break
    	return line
    def write(cmd, ser=None):
    	if ser is None:
    		return 0
    	else:
    		#print cmd
    		cmd += bytearray([sum(cmd)%256]) #add simple checksum
    		c = bytearray([200,200,200,200,200]) + cmd + bytearray([201,201,201,201,201])
    		#print 'sending: ',len(c)
    		ser.write(c)
    		#time.sleep(.01)
    		rtrn = _readline(ser=ser)
    		if len(rtrn)<6 or rtrn[5] != 6:
    			print "didn't get checksum acknowledgement!"
    			print 'return: ',list(rtrn)
    			sys.exit(0)
    		#else:
    		#	print "command acknowledged!", list(rtrn)
    
    def request_state_command():
    	return bytearray([110])
    def request_state(ser=None):
    	write(request_state_command(),ser=ser)
    
    
    def read_state(ser=None):
    	request_state(ser=ser)
    	line = _readline(ser=ser)
    	#print "received: ",list(line)
    	if len(line)>4 and line[0]==200 and line[1]==200 and line[2]==200 and line[3]==200 and line[4]==200:
    		line = line.strip('\n')[5:] #ha, make sure we strip outside the framing, not the inside!!!
    		#items = [ read_int16(map(int,line[x:x+2]),signed=False) for x in xrange(0,len(line)-5,2) ]
    		#items = [ read_int16(line[x:x+2],signed=False) for x in xrange(0,len(line)-5,2) ]
    		#items = [ read_int32(line[x:x+4],signed=True) for x in xrange(0,6*4,4) ] + \
    		#		[ read_int16(line[x:x+2],signed=False) for x in xrange(6*4,len(line)-5,2) ]
    		items = [ read_int16(line[x:x+2],signed=False) for x in xrange(0,7*2,2) ]
    		print 'items: ',items
    		if None not in items:
    			return items
    		else:
    			print 'garbage: ', list(line)
    	else:
    		print 'didnt get framing: ',read_int16(line.strip('\n'),signed=True)
    		sys.exit(0)
    
    
    def pwm_command(cca,ccb,ccc,ccd):
    	#ccx are 16 bit uints, we break them into bytes and send
    	return bytearray([100]\
    	 + encode_int16(cca,signed=False)\
    	 + encode_int16(ccb,signed=False)\
    	 + encode_int16(ccc,signed=False)\
    	 + encode_int16(ccd,signed=False))
    def read_angle_command():
    	#ccx are 16 bit uints, we break them into bytes and send
    	return bytearray([101])
    
    def main():
    	try:
    		ser = serial.Serial(port='/dev/tty.usbserial-FT9L3KWR',baudrate=230400,timeout=1.)
    		ser.isOpen()
    	except(OSError):
    		#no serial port
    		print "Couldn't find the serial port, entering debug mode"
    		ser = None
    
    	print "Starting up..."
    
    	N = 50
    	dt = .03
    	amp = 2**13-1
    	times  = [dt*i for i in range(N)]#[0, 1., 2., 3.]
    	#f = lambda i: int(amp/2*(1-cos(2*pi*i/(N-1))))
    	pwms = [(i,2*i,2*i,2*i) for i,t in enumerate(times)]
    	data = []
    	#assert (len(times)==len(pwms))
    
    	t0 = time.time()
    	current_frame = 0
    	current_time = lambda: time.time() - t0
    	try:
    		while( current_time() <= times[-1] ):
    			if current_time() >= times[current_frame]:
    				print 'writing frame %d'%current_frame
    				write( pwm_command( *pwms[current_frame] ) , ser=ser)
    				new_data = read_state(ser=ser)
    				write( read_angle_command() , ser=ser)
    				data.append(new_data)
    				current_frame += 1
    	except(KeyboardInterrupt):
    		#write( pwm_command(0,0,0,0,0,0) , ser=ser)
    		print 'quitting'
    		if ser is not None:
    			ser.close()
    		sys.exit(0)
    	#print "Finished sending keyframes"
    	#write( pwm_command(0,0,0,0,0,0) , ser=ser)
    
    
    	#with open('sewing_data.csv', 'w') as f:
    	#	for l in data:
    	#		f.write( ','.join(map(str,l))+'\n' )
    
    
    if __name__ == '__main__':
    	main()