Skip to content
Snippets Groups Projects
main.py 5.71 KiB
Newer Older
  • Learn to ignore specific revisions
  • import math
    
    import serial
    import time
    import multiprocessing
    import logging
    
    BAUDRATE_DEFAULT = 921600
    
    
    class Module:
        def __init__(self, port, baudrate=BAUDRATE_DEFAULT):
            self.port = None
            self.baudrate = baudrate
            try:
                self.port = serial.Serial(port, baudrate)
            except serial.SerialException:
                logging.error(f"Cannot connect to {port}")
    
        @property
        def connected(self):
            return self.port is not None
    
        def write(self, txt):
            self.port.write(txt)
    
        def close(self):
            self.port.close()
    
    
    class Stepper(Module):
        def __init__(self, steps_per_unit, port, baudrate=BAUDRATE_DEFAULT):
            super().__init__(port, baudrate)
            self.steps = 0
            self.steps_per_unit = steps_per_unit
    
        def step(self, forward):
            self.write(b"f" if forward else b"r")
            self.steps += 1 if forward else -1
    
    
    class Servo(Module):
        def __init__(self, port, baudrate=BAUDRATE_DEFAULT):
            super().__init__(port, baudrate)
            self.delay_us = 0
    
        def pulse(self, delay_us):
            self.write(delay_us.to_bytes(2, byteorder='little'))
    
    
    def norm(pos1, pos2):
        return math.sqrt(sum([(a - b) * (a - b) for a, b in zip(pos1, pos2)]))
    
    
    class Action:
        pass
    
    
    class PathAction(Action):
        def __call__(self, t):
            raise NotImplementedError()
    
    
    class Line(PathAction):
        def __init__(self, pos1, pos2, feedrate):
            self.pos1 = pos1
            self.pos2 = pos2
            self.duration = norm(pos1, pos2) / feedrate
    
        def __call__(self, t):
            if t > self.duration:
                # end move
                return None
            u = t / self.duration
            return [a * (1 - u) + b * u for a, b in zip(self.pos1, self.pos2)]
    
    
    class LineAcc(PathAction):
        def __init__(self, pos1, pos2, feedrate_max, acc_max):
            self.feedrate = 0
            self.feedrate_max = feedrate_max
            self.acc_max = acc_max
            self.pos1 = pos1
            self.pos2 = pos2
            self.d = 0
            self.t = None
            self.d_brake = None
            self.dist = norm(pos1, pos2)
    
        def __call__(self, t):
            if self.t is None:
                self.t = t
    
            if self.d >= self.dist:
                # end move
                return None
    
            dt = t - self.t
            if self.d < self.dist / 2 and self.feedrate < self.feedrate_max:
                # accelerate
                self.feedrate += self.acc_max * dt
            else:
                if self.d_brake is None:
                    # stopped accelerating, store brake distance
                    self.d_brake = self.dist - self.d
                if self.d > self.d_brake:
                    # decelerate
                    self.feedrate -= self.acc_max * dt
    
            # update state
            self.d += self.feedrate * dt
            self.t = t
    
            # interpolate
            u = self.d / self.dist
            return [a * (1 - u) + b * u for a, b in zip(self.pos1, self.pos2)]
    
    
    def modulesManager(action_queue, modules_config, axis_names):
        logging.info("start loop")
    
        modules = {}
    
        for name, config in modules_config.items():
            if config["type"] == "stepper":
                modules[name] = Stepper(config["steps_per_unit"],
                                        config["port"],
                                        config["baudrate"])
            elif config["type"] == "servo":
                modules[name] = Servo(config["port"],
                                      config["baudrate"])
    
        modules_axis = [modules[name] for name in axis_names]
    
        while True:
            if not action_queue.empty():
                action = action_queue.get()
                if isinstance(action, PathAction):
                    # time in s, ~us resolution
                    t0 = time.perf_counter()
                    while True:
                        t = time.perf_counter()
    
                        # path is a time function
                        pos = action(t - t0)
    
                        if pos is None:
                            # done
                            break
                        for p, motor in zip(pos, modules_axis):
                            steps = int(p * motor.steps_per_unit)
                            if motor.steps < steps:
                                motor.step(True)
                            elif motor.steps > steps:
                                motor.step(False)
    
    
    def main():
        multiprocessing.set_start_method('spawn')
        action_queue = multiprocessing.Queue()
    
        modules_config = {
            "x": {
                "type": "stepper",
    
    Quentin Bolsee's avatar
    Quentin Bolsee committed
                "port": "COM18",
    
                "baudrate": 921600,
    
    Quentin Bolsee's avatar
    Quentin Bolsee committed
                "axis": 0,
    
                "steps_per_unit": 6400
            },
            "y": {
                "type": "stepper",
    
    Quentin Bolsee's avatar
    Quentin Bolsee committed
                "port": "COM21",
    
                "baudrate": 921600,
    
    Quentin Bolsee's avatar
    Quentin Bolsee committed
                "axis": 1,
    
                "steps_per_unit": 6400
    
    Quentin Bolsee's avatar
    Quentin Bolsee committed
            }
    
        }
    
        p1 = multiprocessing.Process(target=modulesManager, args=(action_queue, modules_config, ["x", "y"]))
        p1.start()
    
    
    Quentin Bolsee's avatar
    Quentin Bolsee committed
        feedrate = 0.8
    
    Quentin Bolsee's avatar
    Quentin Bolsee committed
        dist = 1
    
    
        for i in range(10):
    
    Quentin Bolsee's avatar
    Quentin Bolsee committed
            action_queue.put(Line([0, 0], [dist, dist], feedrate))
            action_queue.put(Line([dist, dist], [2*dist, 0], feedrate))
            action_queue.put(Line([2*dist, 0], [dist, -dist], feedrate))
            action_queue.put(Line([dist, -dist], [0, 0], feedrate))
            # action_queue.put(LineAcc([0, 0], [dist, dist], feedrate, acc))
            # action_queue.put(LineAcc([dist, dist], [2*dist, 0], feedrate, acc))
            # action_queue.put(LineAcc([2*dist, 0], [dist, -dist], feedrate, acc))
            # action_queue.put(LineAcc([dist, -dist], [0, 0], feedrate, acc))
    
    
        # action_queue.put(LineAcc([0, 0], [dist, dist], feedrate, acc))
        # action_queue.put(LineAcc([dist, dist], [2 * dist, 0], feedrate, acc))
        # action_queue.put(LineAcc([2 * dist, 0], [dist, -dist], feedrate, acc))
        # action_queue.put(LineAcc([dist, -dist], [0, 0], feedrate, acc))
    
    
    if __name__ == "__main__":
        main()