Skip to content
Snippets Groups Projects
Commit 9e955a10 authored by Quentin Bolsee's avatar Quentin Bolsee
Browse files

code

parent 82833a27
Branches
No related tags found
No related merge requests found
from cobs import cobs
import serial
class CobsUsbSerial:
def __init__(self, port, baudrate=115200):
self.port = port
self.baudrate = baudrate
self.ser = serial.Serial(port, baudrate=baudrate)
def write(self, data: bytes):
data_enc = cobs.encode(data) + b"\x00"
self.ser.write(data_enc)
def read(self):
data_enc = self.ser.read_until(b"\x00")
data = cobs.decode(data_enc[:-1])
return data
......@@ -7,5 +7,11 @@
"3": "img/3.png",
"2": "img/2.png",
"1": "img/1.png"
},
"ports": {
"solenoid": "4150325537323119",
"y_motor": "415032383337320B",
"x_bot_motor": "4150323833373205",
"x_top_motor": "4150323833373008"
}
}
import cv2
import numpy as np
import matplotlib.pyplot as plt
from skimage import morphology, measure
import trace_skeleton
import random
......@@ -9,10 +8,10 @@ import random
def bf(img):
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
gray = cv2.equalizeHist(gray)
filtered = cv2.bilateralFilter(gray, d=11, sigmaColor=75, sigmaSpace=75)
filtered = cv2.bilateralFilter(gray, d=7, sigmaColor=50, sigmaSpace=50)
# filtered = cv2.Sobel(filtered, ddepth=cv2.CV_8U, dx=1, dy=1, ksize=5)
# gray = cv2.cvtColor(filtered, cv2.COLOR_BGR2GRAY)
edges = cv2.adaptiveThreshold(filtered, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, blockSize=9, C=4)
edges = cv2.adaptiveThreshold(filtered, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, blockSize=11, C=4)
# post_filtered = cv2.bilateralFilter(edges, d=9, sigmaColor=75, sigmaSpace=75)
edges = ~edges
......@@ -36,22 +35,23 @@ def bf(img):
def stroke(img):
img_copy = np.zeros_like(img)
img_copy = np.ones_like(img)*255
img2 = bf(img)
img_float = img2.astype(np.float32) / 255.0
img_binary = img_float > 0.5
im = trace_skeleton.thinning(img_binary)
plt.figure()
rects = []
polys = trace_skeleton.traceSkeleton(im, 0, 0, im.shape[1], im.shape[0], 10, 999, rects)
for l in polys:
c = (200 * random.random(), 200 * random.random(), 200 * random.random())
# c = (200 * random.random(), 200 * random.random(), 200 * random.random())
for i in range(0, len(l) - 1):
cv2.line(img_copy, (l[i][0], l[i][1]), (l[i + 1][0], l[i + 1][1]), c)
return img_copy
# cv2.line(img_copy, (l[i][0], l[i][1]), (l[i + 1][0], l[i + 1][1]), c)
cv2.line(img_copy, (l[i][0], l[i][1]), (l[i + 1][0], l[i + 1][1]), (0, 0, 0))
return img_copy, polys
if __name__ == "__main__":
......@@ -67,34 +67,7 @@ if __name__ == "__main__":
img = cv2.imread("test.png", cv2.IMREAD_UNCHANGED)
img = img[:, :, :3]
# img_copy = np.copy(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))*0
img_copy = np.zeros_like(img)
img2 = bf(img)
# plt.figure()
# plt.imshow(img2)
# plt.show()
img_float = img2.astype(np.float32) / 255.0
img_binary = img_float > 0.5
im = trace_skeleton.thinning(img_binary)
plt.figure()
img_stroke, polys = stroke(img)
rects = []
polys = trace_skeleton.traceSkeleton(im, 0, 0, im.shape[1], im.shape[0], 10, 999, rects)
for l in polys:
c = (200 * random.random(), 200 * random.random(), 200 * random.random())
for i in range(0, len(l) - 1):
cv2.line(img_copy, (l[i][0], l[i][1]), (l[i + 1][0], l[i + 1][1]), c)
# plt.plot((l[i][0], l[i + 1][0]), (l[i][1], l[i + 1][1]))
# plt.axis("equal")
# plt.imshow(img_copy)
# plt.show()
# plt.figure()
# plt.imshow(im)
# plt.show()
cv2.imshow('', img_copy)
cv2.imshow("test", img_stroke)
cv2.waitKey(0)
from motor import Motor, HBridge
import numpy as np
import math
import json
import serial.tools.list_ports
import time
home_rate = 60
home_accel = 2000
home_backoff_time = 0.2
go_rate = 150
go_accel = 2000
use_current = 0.4
origin = (130, 118)
class Machine:
def __init__(self, ports_named):
ports_found = find_ports(ports_named)
self.solenoid = HBridge(ports_found["solenoid"])
self.y_motor = Motor(ports_found["y_motor"])
self.x_bottom_motor = Motor(ports_found["x_bot_motor"])
self.x_top_motor = Motor(ports_found["x_top_motor"])
# steps / unit,
self.y_motor.set_steps_per_unit(200 / 40)
self.x_bottom_motor.set_steps_per_unit(200 / 40)
self.x_top_motor.set_steps_per_unit(200 / 40)
# power up
self.y_motor.set_current(use_current)
self.x_bottom_motor.set_current(use_current)
self.x_top_motor.set_current(use_current)
def home(self):
# home Y ....
# if we're on the switch, backoff:
if self.y_motor.get_states()['limit'] > 0:
self.y_motor.set_target_velocity(home_rate, home_accel)
time.sleep(home_backoff_time)
self.y_motor.set_target_velocity(0, home_accel)
time.sleep(home_backoff_time)
# let's home it,
self.y_motor.set_target_velocity(-home_rate, home_accel)
# then check-wait on states,
while True:
states = self.y_motor.get_states()
if states['limit'] > 0:
break
self.y_motor.set_target_velocity(0, home_accel)
# home X ...
if self.x_bottom_motor.get_states()['limit'] > 0:
self.x_bottom_motor.set_target_velocity(home_rate, home_accel)
self.x_top_motor.set_target_velocity(home_rate, home_accel)
time.sleep(home_backoff_time)
self.x_bottom_motor.set_target_velocity(0, home_accel)
self.x_top_motor.set_target_velocity(0, home_accel)
time.sleep(home_backoff_time)
self.x_bottom_motor.set_target_velocity(-home_rate, home_accel)
self.x_top_motor.set_target_velocity(-home_rate, home_accel)
while True:
states = self.x_bottom_motor.get_states()
if states['limit'] > 0:
break
self.x_bottom_motor.set_target_velocity(0, home_accel)
self.x_top_motor.set_target_velocity(0, home_accel)
time.sleep(home_backoff_time)
# set zeroes,
self.x_bottom_motor.set_position(0)
self.x_top_motor.set_position(0)
self.y_motor.set_position(0)
def pen_action(self, index=0, down=False, delay=0.05):
if index == 2:
self.solenoid.set_power(0.45 if down else 0.0, 0)
self.solenoid.set_power(0.45 if down else 0.0, 1)
else:
self.solenoid.set_power(0.45 if down else 0.0, index)
time.sleep(delay)
def goto(self, x, y, vel=None, accel=None, eps=0.3):
if vel is None:
vel = go_rate
if accel is None:
accel = go_accel
self.x_bottom_motor.set_target_position(x, vel, accel)
self.x_top_motor.set_target_position(x, vel, accel)
self.y_motor.set_target_position(y, vel, accel)
# ... wait while not at point:
while True:
x_states = self.x_top_motor.get_states()
y_states = self.y_motor.get_states()
x_err = abs(x_states["position"] - x)
y_err = abs(y_states["position"] - y)
if x_err <= eps and y_err <= eps:
break
def goto_list(self, x_list, y_list, eps=0.5):
for x, y in zip(x_list, y_list):
self.goto(x, y, eps=eps)
def find_ports(named_serial_numbers):
named_serial_ports = {}
ports = serial.tools.list_ports.comports()
for name, serial_number in named_serial_numbers.items():
for port in ports:
if serial_number == port.serial_number:
named_serial_ports[name] = port.device
return named_serial_ports
if __name__ == "__main__":
with open("config.json", "r") as f:
config = json.load(f)
m = Machine(config["ports"])
m.home()
m.goto(0, origin[1])
m.goto(origin[0], origin[1])
w = 20
h = 20
m.goto(origin[0]+w, origin[1]+h)
m.pen_action(2, True)
m.goto(origin[0]-w, origin[1]+h)
m.goto(origin[0]-w, origin[1]-h)
m.goto(origin[0]+w, origin[1]-h)
m.goto(origin[0]+w, origin[1]+h)
m.pen_action(2, False)
m.goto(0, origin[1])
m.goto(0, 0)
......@@ -18,7 +18,7 @@ STATE_DONE = 6
class MainApplication:
def __init__(self, fullscreen=False, rotate=False):
self.state = STATE_INIT
self.window = cv2.namedWindow("main", cv2.WINDOW_FULLSCREEN if fullscreen else cv2.WINDOW_AUTOSIZE)
self.window = cv2.namedWindow("main", cv2.WINDOW_FULLSCREEN if fullscreen else cv2.WINDOW_KEEPRATIO)
self.video = cv2.VideoCapture(0)
self.width = 480
self.height = 800
......@@ -93,14 +93,12 @@ class MainApplication:
x, y = self.click
self.click = None
if x < self.width // 2:
print("yes")
self.state = STATE_VECTORIZED
else:
print("no")
self.state = STATE_STREAM
elif self.state == STATE_VECTORIZED:
# self.img = filtering.stroke(self.img_captured)
cv2.imwrite("test.png", self.img_captured)
img_strokes = filtering.stroke(self.img_captured)
self.img = cv2.cvtColor(filtering.bf(self.img_captured), cv2.COLOR_GRAY2BGR)
self.state = STATE_DRAWING
# self.running = False
......
from cobs_usb_serial import CobsUsbSerial
import struct
MSG_GET_ID = 7
MSG_ECHO_TEST = 11
MSG_SET_POWER = 20
MSG_SET_TARG_VEL = 21
MSG_SET_TARG_POS = 22
MSG_SET_POSITION = 23
MSG_SET_CURRENT = 24
MSG_GET_STATES = 25
MSG_ACK = 31
MSG_NACK = 32
class Motor:
def __init__(self, port: str):
self.cobs = CobsUsbSerial(port)
self.spu = 200
def get_id(self):
msg = struct.pack('=B', MSG_GET_ID)
self.cobs.write(msg)
ack, device_id = struct.unpack('=BB', self.cobs.read())
return device_id
def set_steps_per_unit(self, spu):
self.spu = spu
def test(self) -> None:
test_bytes = bytes([MSG_ECHO_TEST, 14, 16])
print("testing w/ an echo-packet: ", [byte for byte in test_bytes])
self.cobs.write(test_bytes)
reply = self.cobs.read()
print("test echo returns: ", [byte for byte in reply])
def set_target_velocity(self, velocity: float, accel: float):
msg = struct.pack('=Bff', MSG_SET_TARG_VEL, velocity * self.spu, accel * self.spu)
self.cobs.write(msg)
return struct.unpack('=B', self.cobs.read())
def set_target_position_no_wait(self, position: float, velocity: float, accel: float):
msg = struct.pack('=Bfff', MSG_SET_TARG_POS, position * self.spu, velocity * self.spu, accel * self.spu)
self.cobs.write(msg)
return
def set_target_position(self, position: float, velocity: float, accel: float):
self.set_target_position_no_wait(position, velocity, accel)
return struct.unpack('=B', self.cobs.read())
def set_position(self, position: float):
msg = struct.pack('=Bf', MSG_SET_POSITION, position * self.spu)
self.cobs.write(msg)
return struct.unpack('=B', self.cobs.read())
def set_current(self, current: float):
msg = struct.pack('=Bf', MSG_SET_CURRENT, current)
self.cobs.write(msg)
return struct.unpack('=B', self.cobs.read())
def get_states(self):
self.cobs.write(bytes([MSG_GET_STATES]))
reply = self.cobs.read()
# print("states returns: ", [byte for byte in reply])
# print(reply)
ack, position, velocity, acceleration, limit = struct.unpack('=BfffB', reply)
return {
'ack': ack,
'position': position / self.spu,
'velocity': velocity / self.spu,
'acceleration': acceleration / self.spu,
'limit': limit
}
class HBridge:
def __init__(self, port: str):
self.cobs = CobsUsbSerial(port)
def get_id(self):
msg = struct.pack('=B', MSG_GET_ID)
self.cobs.write(msg)
ack, device_id = struct.unpack('=BB', self.cobs.read())
return device_id
def test(self) -> None:
test_bytes = bytes([MSG_ECHO_TEST, 14, 16])
print("testing w/ an echo-packet: ", [byte for byte in test_bytes])
self.cobs.write(test_bytes)
reply = self.cobs.read()
print("test echo returns: ", [byte for byte in reply])
def set_power(self, current: float, index: int):
msg = struct.pack('=BfB', MSG_SET_POWER, current, index)
self.cobs.write(msg)
return struct.unpack('=B', self.cobs.read())
def get_states(self):
self.cobs.write(bytes([MSG_GET_STATES]))
reply = self.cobs.read()
# print("states returns: ", [byte for byte in reply])
# print(reply)
ack, power_a, power_b = struct.unpack('=Bff', reply)
return {
'ack': ack,
'power_a': power_a,
'power_b': power_b,
}
host/test.png

421 KiB | W: | H:

host/test.png

444 KiB | W: | H:

host/test.png
host/test.png
host/test.png
host/test.png
  • 2-up
  • Swipe
  • Onion skin
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment