Skip to content
Snippets Groups Projects
Select Git revision
  • 499a5c5559c7d48a6896eccd1da75f9c10bff9cb
  • master default protected
2 results

ximage

Blame
  • serialtimetest.4.multi.py 2.76 KiB
    #
    # serialtimetest.4.multi.py
    #    serial speed time test, four ports, multiprocessing version
    #
    # Neil Gershenfeld  5/8/21
    # This work may be reproduced, modified, distributed,
    # performed, and displayed for any purpose, but must
    # acknowledge this project. Copyright is retained and
    # must be preserved. The work is provided as is; no
    # warranty is provided, and users accept all liability.
    #
    import serial,sys,time,signal,multiprocessing
    if (len(sys.argv) != 7):
       print("command line: serialtest.4.multi.py port0 port1 port 2 port3 speed delay")
       sys.exit()
    device0 = sys.argv[1]
    device1 = sys.argv[2]
    device2 = sys.argv[3]
    device3 = sys.argv[4]
    baud = int(sys.argv[5])
    delay = float(sys.argv[6])
    print('open '+device0+' at '+str(baud)+' delay '+str(delay))
    port0 = serial.Serial(device0,baudrate=baud,timeout=0)
    print('open '+device1+' at '+str(baud)+' delay '+str(delay))
    port1 = serial.Serial(device1,baudrate=baud,timeout=0)
    print('open '+device2+' at '+str(baud)+' delay '+str(delay))
    port2 = serial.Serial(device2,baudrate=baud,timeout=0)
    print('open '+device3+' at '+str(baud)+' delay '+str(delay))
    port3 = serial.Serial(device3,baudrate=baud,timeout=0)
    ports = [port0,port1,port2,port3]
    '''
    #
    # event version
    #
    evt = multiprocessing.Event()
    #
    # worker event handler
    #
    def worker(id,evt):
       while (1):
          while (1):
             if (evt.is_set()):
                break
          ports[id].write(msg)
          while (1):
             if (not evt.is_set()):
                break
          ports[id].write(msg)
    #
    # alarm event handler
    #
    def handler(signum,stack):
       if (evt.is_set()):
          evt.clear()
       else:
          evt.set()
    #
    # start workers
    #
    p0 = multiprocessing.Process(target=worker,args=(0,evt,))
    p0.start()
    p1 = multiprocessing.Process(target=worker,args=(1,evt,))
    p1.start()
    p2 = multiprocessing.Process(target=worker,args=(2,evt,))
    p2.start()
    p3 = multiprocessing.Process(target=worker,args=(3,evt,))
    p3.start()
    '''
    #
    # Value version
    #
    val = multiprocessing.Value('i',0,lock=False)
    #
    # worker event handler
    #
    def worker(id,val):
       while (1):
          while (1):
             if (val.value == 0):
                break
          ports[id].write(id.to_bytes(1,'big'))
          while (1):
             if (val.value == 1):
                break
          ports[id].write(id.to_bytes(1,'big'))
    #
    # alarm event handler
    #
    def handler(signum,stack):
       if (val.value == 0):
          val.value = 1
       else:
          val.value = 0
    #
    # start workers
    #
    p0 = multiprocessing.Process(target=worker,args=(0,val,))
    p0.start()
    p1 = multiprocessing.Process(target=worker,args=(1,val,))
    p1.start()
    p2 = multiprocessing.Process(target=worker,args=(2,val,))
    p2.start()
    p3 = multiprocessing.Process(target=worker,args=(3,val,))
    p3.start()
    #
    # start alarm
    #
    signal.signal(signal.SIGALRM,handler)
    signal.setitimer(signal.ITIMER_REAL,delay,delay)
    #
    # wait for alarms
    #
    while (1):
       0