Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions fastcore/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ def g(_obj_td, *args, **kwargs):
_obj_td.result = res
@wraps(f)
def _f(*args, **kwargs):
res = (Thread,Process)[process](target=g, args=args, kwargs=kwargs)
if process:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want this, do we? I'd expect process=True to give us a normal Process. Why would we want something different on Mac?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disclaimer my understanding of this is not the best, but I know a bit about concurrency // distributed systems.

My understanding is that what Process does depends on default start method of the platform (which I suppose, could change in an update, but not likely).

Anyway, I believe for true linux systems Process forks a copy while macOS (apple silicon, maybe the old intel chips too) spawns a new interpreter.

So supposing threaded(process=True) linux ends up with the forked process and macOS does spawn which carries the implications of a fresh interpreter, reloading the module, and requires picklablity (which is its own headache in its own way)

There is a push to using spawn, but at the moment picklability + nested functions will likely lead to errors (which I've encountered)

So the below get_context('fork').Process is the special-case to make process=True behave the same way on macOS as it does on Linux — not to make it special, but to avoid macOS being the odd one out.

Provided that I understand.

Proc = get_context('fork').Process if sys.platform == 'darwin' else Process
else:
Proc = Thread
res = Proc(target=g, args=args, kwargs=kwargs)
res._args = (res,)+res._args
res.start()
return res
Expand Down Expand Up @@ -123,7 +127,9 @@ def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None
kwpool = {}
if threadpool: pool = ThreadPoolExecutor
else:
if not method and sys.platform == 'darwin': method='fork'
if not method and sys.platform == 'darwin':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES is a good workaround in practice, in our experience. How about we check for that first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That certainly helps. Do we want to force that as fix? or UserWarning?

# Use fork only if function is defined in __main__ (notebooks/REPL), otherwise use spawn
method = 'fork' if getattr(f, '__module__', None) == '__main__' else 'spawn'
if method: kwpool['mp_context'] = get_context(method)
pool = ProcessPoolExecutor
with pool(n_workers, pause=pause, **kwpool) as ex:
Expand Down Expand Up @@ -158,7 +164,8 @@ async def limited_task(item):
# %% ../nbs/03a_parallel.ipynb
def run_procs(f, f_done, args):
"Call `f` for each item in `args` in parallel, yielding `f_done`"
processes = L(args).map(Process, args=arg0, target=f)
Proc = get_context('fork').Process if sys.platform == 'darwin' else Process
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fork feels a bit unexpected to me here too.

processes = L(args).map(Proc, args=arg0, target=f)
for o in processes: o.start()
yield from f_done()
processes.map(Self.join())
Expand Down
13 changes: 10 additions & 3 deletions nbs/03a_parallel.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@
" _obj_td.result = res\n",
" @wraps(f)\n",
" def _f(*args, **kwargs):\n",
" res = (Thread,Process)[process](target=g, args=args, kwargs=kwargs)\n",
" if process:\n",
" Proc = get_context('fork').Process if sys.platform == 'darwin' else Process\n",
" else:\n",
" Proc = Thread\n",
" res = Proc(target=g, args=args, kwargs=kwargs)\n",
" res._args = (res,)+res._args\n",
" res.start()\n",
" return res\n",
Expand Down Expand Up @@ -414,7 +418,9 @@
" kwpool = {}\n",
" if threadpool: pool = ThreadPoolExecutor\n",
" else:\n",
" if not method and sys.platform == 'darwin': method='fork'\n",
" if not method and sys.platform == 'darwin':\n",
" # Use fork only if function is defined in __main__ (notebooks/REPL), otherwise use spawn\n",
" method = 'fork' if getattr(f, '__module__', None) == '__main__' else 'spawn'\n",
" if method: kwpool['mp_context'] = get_context(method)\n",
" pool = ProcessPoolExecutor\n",
" with pool(n_workers, pause=pause, **kwpool) as ex:\n",
Expand Down Expand Up @@ -587,7 +593,8 @@
"#| export\n",
"def run_procs(f, f_done, args):\n",
" \"Call `f` for each item in `args` in parallel, yielding `f_done`\"\n",
" processes = L(args).map(Process, args=arg0, target=f)\n",
" Proc = get_context('fork').Process if sys.platform == 'darwin' else Process\n",
" processes = L(args).map(Proc, args=arg0, target=f)\n",
" for o in processes: o.start()\n",
" yield from f_done()\n",
" processes.map(Self.join())"
Expand Down
Loading