Select Git revision
-
Neil Gershenfeld authoredNeil Gershenfeld authored
serialtimetest.4.multi.py 2.76 KiB
#
# serialtimetest.4.multi.py
# serial speed time test, four ports, multiprocessing version
#
# Neil Gershenfeld 5/8/21
# This work may be reproduced, modified, distributed,
# performed, and displayed for any purpose, but must
# acknowledge this project. Copyright is retained and
# must be preserved. The work is provided as is; no
# warranty is provided, and users accept all liability.
#
import serial,sys,time,signal,multiprocessing
if (len(sys.argv) != 7):
print("command line: serialtest.4.multi.py port0 port1 port 2 port3 speed delay")
sys.exit()
device0 = sys.argv[1]
device1 = sys.argv[2]
device2 = sys.argv[3]
device3 = sys.argv[4]
baud = int(sys.argv[5])
delay = float(sys.argv[6])
print('open '+device0+' at '+str(baud)+' delay '+str(delay))
port0 = serial.Serial(device0,baudrate=baud,timeout=0)
print('open '+device1+' at '+str(baud)+' delay '+str(delay))
port1 = serial.Serial(device1,baudrate=baud,timeout=0)
print('open '+device2+' at '+str(baud)+' delay '+str(delay))
port2 = serial.Serial(device2,baudrate=baud,timeout=0)
print('open '+device3+' at '+str(baud)+' delay '+str(delay))
port3 = serial.Serial(device3,baudrate=baud,timeout=0)
ports = [port0,port1,port2,port3]
'''
#
# event version
#
evt = multiprocessing.Event()
#
# worker event handler
#
def worker(id,evt):
while (1):
while (1):
if (evt.is_set()):
break
ports[id].write(msg)
while (1):
if (not evt.is_set()):
break
ports[id].write(msg)
#
# alarm event handler
#
def handler(signum,stack):
if (evt.is_set()):
evt.clear()
else:
evt.set()
#
# start workers
#
p0 = multiprocessing.Process(target=worker,args=(0,evt,))
p0.start()
p1 = multiprocessing.Process(target=worker,args=(1,evt,))
p1.start()
p2 = multiprocessing.Process(target=worker,args=(2,evt,))
p2.start()
p3 = multiprocessing.Process(target=worker,args=(3,evt,))
p3.start()
'''
#
# Value version
#
val = multiprocessing.Value('i',0,lock=False)
#
# worker event handler
#
def worker(id,val):
while (1):
while (1):
if (val.value == 0):
break
ports[id].write(id.to_bytes(1,'big'))
while (1):
if (val.value == 1):
break
ports[id].write(id.to_bytes(1,'big'))
#
# alarm event handler
#
def handler(signum,stack):
if (val.value == 0):
val.value = 1
else:
val.value = 0
#
# start workers
#
p0 = multiprocessing.Process(target=worker,args=(0,val,))
p0.start()
p1 = multiprocessing.Process(target=worker,args=(1,val,))
p1.start()
p2 = multiprocessing.Process(target=worker,args=(2,val,))
p2.start()
p3 = multiprocessing.Process(target=worker,args=(3,val,))
p3.start()
#
# start alarm
#
signal.signal(signal.SIGALRM,handler)
signal.setitimer(signal.ITIMER_REAL,delay,delay)
#
# wait for alarms
#
while (1):
0