# # serialtimetest.4.multi.py # serial speed time test, four ports, multiprocessing version # # Neil Gershenfeld 5/8/21 # This work may be reproduced, modified, distributed, # performed, and displayed for any purpose, but must # acknowledge this project. Copyright is retained and # must be preserved. The work is provided as is; no # warranty is provided, and users accept all liability. # import serial,sys,time,signal,multiprocessing if (len(sys.argv) != 7): print("command line: serialtest.4.multi.py port0 port1 port 2 port3 speed delay") sys.exit() device0 = sys.argv[1] device1 = sys.argv[2] device2 = sys.argv[3] device3 = sys.argv[4] baud = int(sys.argv[5]) delay = float(sys.argv[6]) print('open '+device0+' at '+str(baud)+' delay '+str(delay)) port0 = serial.Serial(device0,baudrate=baud,timeout=0) print('open '+device1+' at '+str(baud)+' delay '+str(delay)) port1 = serial.Serial(device1,baudrate=baud,timeout=0) print('open '+device2+' at '+str(baud)+' delay '+str(delay)) port2 = serial.Serial(device2,baudrate=baud,timeout=0) print('open '+device3+' at '+str(baud)+' delay '+str(delay)) port3 = serial.Serial(device3,baudrate=baud,timeout=0) ports = [port0,port1,port2,port3] ''' # # event version # evt = multiprocessing.Event() # # worker event handler # def worker(id,evt): while (1): while (1): if (evt.is_set()): break ports[id].write(msg) while (1): if (not evt.is_set()): break ports[id].write(msg) # # alarm event handler # def handler(signum,stack): if (evt.is_set()): evt.clear() else: evt.set() # # start workers # p0 = multiprocessing.Process(target=worker,args=(0,evt,)) p0.start() p1 = multiprocessing.Process(target=worker,args=(1,evt,)) p1.start() p2 = multiprocessing.Process(target=worker,args=(2,evt,)) p2.start() p3 = multiprocessing.Process(target=worker,args=(3,evt,)) p3.start() ''' # # Value version # val = multiprocessing.Value('i',0,lock=False) # # worker event handler # def worker(id,val): while (1): while (1): if (val.value == 0): break ports[id].write(id.to_bytes(1,'big')) while (1): if (val.value == 1): break ports[id].write(id.to_bytes(1,'big')) # # alarm event handler # def handler(signum,stack): if (val.value == 0): val.value = 1 else: val.value = 0 # # start workers # p0 = multiprocessing.Process(target=worker,args=(0,val,)) p0.start() p1 = multiprocessing.Process(target=worker,args=(1,val,)) p1.start() p2 = multiprocessing.Process(target=worker,args=(2,val,)) p2.start() p3 = multiprocessing.Process(target=worker,args=(3,val,)) p3.start() # # start alarm # signal.signal(signal.SIGALRM,handler) signal.setitimer(signal.ITIMER_REAL,delay,delay) # # wait for alarms # while (1): 0