Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import math
import serial
import time
import multiprocessing
import logging
BAUDRATE_DEFAULT = 921600
class Module:
def __init__(self, port, baudrate=BAUDRATE_DEFAULT):
self.port = None
self.baudrate = baudrate
try:
self.port = serial.Serial(port, baudrate)
except serial.SerialException:
logging.error(f"Cannot connect to {port}")
@property
def connected(self):
return self.port is not None
def write(self, txt):
self.port.write(txt)
def close(self):
self.port.close()
class Stepper(Module):
def __init__(self, steps_per_unit, port, baudrate=BAUDRATE_DEFAULT):
super().__init__(port, baudrate)
self.steps = 0
self.steps_per_unit = steps_per_unit
def step(self, forward):
self.write(b"f" if forward else b"r")
self.steps += 1 if forward else -1
class Servo(Module):
def __init__(self, port, baudrate=BAUDRATE_DEFAULT):
super().__init__(port, baudrate)
self.delay_us = 0
def pulse(self, delay_us):
self.write(delay_us.to_bytes(2, byteorder='little'))
def norm(pos1, pos2):
return math.sqrt(sum([(a - b) * (a - b) for a, b in zip(pos1, pos2)]))
class Action:
pass
class PathAction(Action):
def __call__(self, t):
raise NotImplementedError()
class Line(PathAction):
def __init__(self, pos1, pos2, feedrate):
self.pos1 = pos1
self.pos2 = pos2
self.duration = norm(pos1, pos2) / feedrate
def __call__(self, t):
if t > self.duration:
# end move
return None
u = t / self.duration
return [a * (1 - u) + b * u for a, b in zip(self.pos1, self.pos2)]
class LineAcc(PathAction):
def __init__(self, pos1, pos2, feedrate_max, acc_max):
self.feedrate = 0
self.feedrate_max = feedrate_max
self.acc_max = acc_max
self.pos1 = pos1
self.pos2 = pos2
self.d = 0
self.t = None
self.d_brake = None
self.dist = norm(pos1, pos2)
def __call__(self, t):
if self.t is None:
self.t = t
if self.d >= self.dist:
# end move
return None
dt = t - self.t
if self.d < self.dist / 2 and self.feedrate < self.feedrate_max:
# accelerate
self.feedrate += self.acc_max * dt
else:
if self.d_brake is None:
# stopped accelerating, store brake distance
self.d_brake = self.dist - self.d
if self.d > self.d_brake:
# decelerate
self.feedrate -= self.acc_max * dt
# update state
self.d += self.feedrate * dt
self.t = t
# interpolate
u = self.d / self.dist
return [a * (1 - u) + b * u for a, b in zip(self.pos1, self.pos2)]
def modulesManager(action_queue, modules_config, axis_names):
logging.info("start loop")
modules = {}
for name, config in modules_config.items():
if config["type"] == "stepper":
modules[name] = Stepper(config["steps_per_unit"],
config["port"],
config["baudrate"])
elif config["type"] == "servo":
modules[name] = Servo(config["port"],
config["baudrate"])
modules_axis = [modules[name] for name in axis_names]
while True:
if not action_queue.empty():
action = action_queue.get()
if isinstance(action, PathAction):
# time in s, ~us resolution
t0 = time.perf_counter()
while True:
t = time.perf_counter()
# path is a time function
pos = action(t - t0)
if pos is None:
# done
break
for p, motor in zip(pos, modules_axis):
steps = int(p * motor.steps_per_unit)
if motor.steps < steps:
motor.step(True)
elif motor.steps > steps:
motor.step(False)
def main():
multiprocessing.set_start_method('spawn')
action_queue = multiprocessing.Queue()
modules_config = {
"x": {
"type": "stepper",
"steps_per_unit": 6400
},
"y": {
"type": "stepper",
}
p1 = multiprocessing.Process(target=modulesManager, args=(action_queue, modules_config, ["x", "y"]))
p1.start()
action_queue.put(Line([0, 0], [dist, dist], feedrate))
action_queue.put(Line([dist, dist], [2*dist, 0], feedrate))
action_queue.put(Line([2*dist, 0], [dist, -dist], feedrate))
action_queue.put(Line([dist, -dist], [0, 0], feedrate))
# action_queue.put(LineAcc([0, 0], [dist, dist], feedrate, acc))
# action_queue.put(LineAcc([dist, dist], [2*dist, 0], feedrate, acc))
# action_queue.put(LineAcc([2*dist, 0], [dist, -dist], feedrate, acc))
# action_queue.put(LineAcc([dist, -dist], [0, 0], feedrate, acc))
# action_queue.put(LineAcc([0, 0], [dist, dist], feedrate, acc))
# action_queue.put(LineAcc([dist, dist], [2 * dist, 0], feedrate, acc))
# action_queue.put(LineAcc([2 * dist, 0], [dist, -dist], feedrate, acc))
# action_queue.put(LineAcc([dist, -dist], [0, 0], feedrate, acc))
if __name__ == "__main__":
main()