Skip to content

Commit

Permalink
Use spawn start method in multiprocessing programs (#11391)
Browse files Browse the repository at this point in the history
* Use `spawn` start method in multiprocessing programs

* Set `spawn` start method in doctest

* Use `with` statement for locks

* Pass multiprocessing context explicitly
  • Loading branch information
XuehaiPan committed May 2, 2024
1 parent 5131e31 commit ea53051
Showing 1 changed file with 53 additions and 26 deletions.
79 changes: 53 additions & 26 deletions sorts/odd_even_transposition_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
synchronization could be used.
"""

from multiprocessing import Lock, Pipe, Process
import multiprocessing as mp

# lock used to ensure that two processes do not access a pipe at the same time
# NOTE This breaks testing on build runner. May work better locally
# process_lock = Lock()
# process_lock = mp.Lock()

"""
The function run by the processes that sorts the list
Expand All @@ -29,36 +29,41 @@
"""


def oe_process(position, value, l_send, r_send, lr_cv, rr_cv, result_pipe):
process_lock = Lock()
def oe_process(
position,
value,
l_send,
r_send,
lr_cv,
rr_cv,
result_pipe,
multiprocessing_context,
):
process_lock = multiprocessing_context.Lock()

# we perform n swaps since after n swaps we know we are sorted
# we *could* stop early if we are sorted already, but it takes as long to
# find out we are sorted as it does to sort the list with this algorithm
for i in range(10):
if (i + position) % 2 == 0 and r_send is not None:
# send your value to your right neighbor
process_lock.acquire()
r_send[1].send(value)
process_lock.release()
with process_lock:
r_send[1].send(value)

# receive your right neighbor's value
process_lock.acquire()
temp = rr_cv[0].recv()
process_lock.release()
with process_lock:
temp = rr_cv[0].recv()

# take the lower value since you are on the left
value = min(value, temp)
elif (i + position) % 2 != 0 and l_send is not None:
# send your value to your left neighbor
process_lock.acquire()
l_send[1].send(value)
process_lock.release()
with process_lock:
l_send[1].send(value)

# receive your left neighbor's value
process_lock.acquire()
temp = lr_cv[0].recv()
process_lock.release()
with process_lock:
temp = lr_cv[0].recv()

# take the higher value since you are on the right
value = max(value, temp)
Expand Down Expand Up @@ -94,39 +99,60 @@ def odd_even_transposition(arr):
>>> odd_even_transposition(unsorted_list) == sorted(unsorted_list + [1])
False
"""
# spawn method is considered safer than fork
multiprocessing_context = mp.get_context("spawn")

process_array_ = []
result_pipe = []
# initialize the list of pipes where the values will be retrieved
for _ in arr:
result_pipe.append(Pipe())
result_pipe.append(multiprocessing_context.Pipe())
# creates the processes
# the first and last process only have one neighbor so they are made outside
# of the loop
temp_rs = Pipe()
temp_rr = Pipe()
temp_rs = multiprocessing_context.Pipe()
temp_rr = multiprocessing_context.Pipe()
process_array_.append(
Process(
multiprocessing_context.Process(
target=oe_process,
args=(0, arr[0], None, temp_rs, None, temp_rr, result_pipe[0]),
args=(
0,
arr[0],
None,
temp_rs,
None,
temp_rr,
result_pipe[0],
multiprocessing_context,
),
)
)
temp_lr = temp_rs
temp_ls = temp_rr

for i in range(1, len(arr) - 1):
temp_rs = Pipe()
temp_rr = Pipe()
temp_rs = multiprocessing_context.Pipe()
temp_rr = multiprocessing_context.Pipe()
process_array_.append(
Process(
multiprocessing_context.Process(
target=oe_process,
args=(i, arr[i], temp_ls, temp_rs, temp_lr, temp_rr, result_pipe[i]),
args=(
i,
arr[i],
temp_ls,
temp_rs,
temp_lr,
temp_rr,
result_pipe[i],
multiprocessing_context,
),
)
)
temp_lr = temp_rs
temp_ls = temp_rr

process_array_.append(
Process(
multiprocessing_context.Process(
target=oe_process,
args=(
len(arr) - 1,
Expand All @@ -136,6 +162,7 @@ def odd_even_transposition(arr):
temp_lr,
None,
result_pipe[len(arr) - 1],
multiprocessing_context,
),
)
)
Expand Down

0 comments on commit ea53051

Please sign in to comment.