Skip to content
Snippets Groups Projects
Commit 1c50d96d authored by Neil Gershenfeld's avatar Neil Gershenfeld
Browse files

wip

parent a0d8851c
Branches
No related tags found
No related merge requests found
#
# serialtimetest.2.multi.py
# serial speed time test, two ports, multiprocessing version
#
# Neil Gershenfeld 4/30/21
# This work may be reproduced, modified, distributed,
# performed, and displayed for any purpose, but must
# acknowledge this project. Copyright is retained and
# must be preserved. The work is provided as is; no
# warranty is provided, and users accept all liability.
#
import serial,sys,time,signal,multiprocessing
if (len(sys.argv) != 5):
print("command line: serialtest.2.multi.py port1 port2 speed delay")
sys.exit()
device1 = sys.argv[1]
device2 = sys.argv[2]
baud = int(sys.argv[3])
delay = float(sys.argv[4])
print('open '+device1+' at '+str(baud)+' delay '+str(delay))
port0 = serial.Serial(device1,baudrate=baud,timeout=0)
print('open '+device2+' at '+str(baud)+' delay '+str(delay))
port1 = serial.Serial(device2,baudrate=baud,timeout=0)
msg = b"\x00"
ports = [port0,port1]
evt = multiprocessing.Event()
#
# worker event handler
#
def worker(id,evt):
while (1):
while (1):
if (evt.is_set()):
break
ports[id].write(msg)
while (1):
if (not evt.is_set()):
break
ports[id].write(msg)
#
# alarm event handler
#
def handler(signum,stack):
if (evt.is_set()):
evt.clear()
else:
evt.set()
#
# start workers
#
p0 = multiprocessing.Process(target=worker,args=(0,evt,))
p0.start()
p1 = multiprocessing.Process(target=worker,args=(1,evt,))
p1.start()
#
# start alarm
#
signal.signal(signal.SIGALRM,handler)
signal.setitimer(signal.ITIMER_REAL,1,delay)
#
# wait for alarms
#
while (1):
0
......@@ -9,45 +9,41 @@
# must be preserved. The work is provided as is; no
# warranty is provided, and users accept all liability.
#
import sys,time,signal,threading
delay = 0.2
'''
import serial,sys,time,signal
import serial,sys,time,signal,threading
if (len(sys.argv) != 5):
print("command line: serialtest.2.py port1 port2 speed delay")
print("command line: serialtest.2.threads.py port1 port2 speed delay")
sys.exit()
device1 = sys.argv[1]
device2 = sys.argv[2]
baud = int(sys.argv[3])
delay = float(sys.argv[4])
print('open '+device1+' at '+str(baud)+' delay '+str(delay))
port1 = serial.Serial(device1,baudrate=baud,timeout=0)
port0 = serial.Serial(device1,baudrate=baud,timeout=0)
print('open '+device2+' at '+str(baud)+' delay '+str(delay))
port2 = serial.Serial(device2,baudrate=baud,timeout=0)
'''
msg0 = b"\x00"
msg1 = b"\xff"
port1 = serial.Serial(device2,baudrate=baud,timeout=0)
msg = b"\x00"
flag = 0
ports = [port0,port1]
#
# thread handler
#
def fn(id):
global v
global v,ports,msg
while (1):
#
# wait for flag to be raised
#
while (1):
if (flag != 0):
print(1,id)
break
ports[id].write(msg)
#
# wait for flag to be lowered
#
while (1):
if (flag == 0):
print(0,id)
break
ports[id].write(msg)
#
# alarm handler
#
......@@ -60,10 +56,10 @@ def handler(signum,stack):
#
# start threads
#
t1 = threading.Thread(target=fn,args=('t1',))
t0 = threading.Thread(target=fn,args=(0,))
t0.start()
t1 = threading.Thread(target=fn,args=(1,))
t1.start()
t2 = threading.Thread(target=fn,args=('t2',))
t2.start()
#
# start alarm
#
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment