diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 6daf8562..5969c53a 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,7 @@ Changes 5.0.1 ~~~~~ +* FIX: Prevented duplicate or inconsistent profiler output under Python 3.14 when multiprocessing is used. * ENH: Add %%lprun_all for more beginner-friendly profiling in IPython/Jupyter #383 * FIX: mitigate speed regressions introduced in 5.0.0 * ENH: Added capability to combine profiling data both programmatically (``LineStats.__add__()``) and via the CLI (``python -m line_profiler``) (#380, originally proposed in #219) diff --git a/line_profiler/explicit_profiler.py b/line_profiler/explicit_profiler.py index bb7a904a..daff1ff9 100644 --- a/line_profiler/explicit_profiler.py +++ b/line_profiler/explicit_profiler.py @@ -164,13 +164,21 @@ def func4(): The core functionality in this module was ported from :mod:`xdev`. """ import atexit +import multiprocessing import os +import pathlib import sys # This is for compatibility from .cli_utils import boolean, get_python_executable as _python_command from .line_profiler import LineProfiler from .toml_config import ConfigSource +# The first process that enables profiling records its PID here. Child processes +# created via multiprocessing (spawn/forkserver) inherit this environment value, +# which helps prevent helper processes from claiming ownership and clobbering +# output. Standalone subprocess runs should always be able to reset this value. +_OWNER_PID_ENVVAR = 'LINE_PROFILER_OWNER_PID' + class GlobalProfiler: """ @@ -264,8 +272,8 @@ def __init__(self, config=None): self._config = config_source.path self._profile = None + self._owner_pid = None self.enabled = None - # Configs: # - How to toggle the profiler self.setup_config = config_source.conf_dict['setup'] @@ -310,6 +318,28 @@ def enable(self, output_prefix=None): """ Explicitly enables global profiler and controls its settings. """ + self._debug('enable:enter') + # When using multiprocessing start methods like 'spawn'/'forkserver', + # helper processes may import this module. Only register the atexit + # reporting hook (and enable profiling) in real script invocations to + # prevent duplicate/out-of-order output. + if self._is_helper_process_context(): + self._debug('enable:helper-context') + self.enabled = False + return + + if self._should_skip_due_to_owner(): + self._debug('enable:skip-due-to-owner') + self.enabled = False + return + + # Standalone script executions should always claim ownership, even if a + # PID marker was inherited from another process environment. + owner_pid = os.getpid() + os.environ[_OWNER_PID_ENVVAR] = str(owner_pid) + self._owner_pid = owner_pid + self._debug('enable:owner-claimed', owner_pid=owner_pid) + if self._profile is None: # Try to only ever create one real LineProfiler object atexit.register(self.show) @@ -322,6 +352,120 @@ def enable(self, output_prefix=None): if output_prefix is not None: self.output_prefix = output_prefix + def _is_helper_process_context(self): + """ + Determine if this process looks like a multiprocessing helper. + + Helper contexts should never register atexit hooks or claim ownership, + while real script invocations should always be allowed to do so. + """ + argv0 = sys.argv[0] if sys.argv else '' + if self._has_forkserver_env(): + self._debug('helper:forkserver-env', argv0=argv0) + return True + try: + import multiprocessing.spawn as mp_spawn + if getattr(mp_spawn, '_inheriting', False): + self._debug('helper:spawn-inheriting', argv0=argv0) + return True + except Exception: + pass + try: + if multiprocessing.current_process().name != 'MainProcess': + self._debug( + 'helper:non-main-process', + process_name=multiprocessing.current_process().name, + argv0=argv0, + ) + return True + except Exception: + pass + + main_mod = sys.modules.get('__main__') + main_file = getattr(main_mod, '__file__', None) + for candidate in (argv0, main_file): + if candidate: + try: + if pathlib.Path(candidate).exists(): + self._debug('helper:script-detected', candidate=candidate) + return False + except Exception: + continue + + self._debug('helper:no-script-detected', argv0=argv0, main_file=main_file) + return True + + def _should_skip_due_to_owner(self): + """ + In multiprocessing children, respect an inherited owner marker. + + Standalone subprocesses (parent_process is None) should reset ownership, + but fork/spawn children should not clobber a parent owner's outputs. + """ + try: + if multiprocessing.parent_process() is None: + self._debug('owner:no-parent', owner=os.environ.get(_OWNER_PID_ENVVAR)) + return False + except Exception: + return False + + owner = os.environ.get(_OWNER_PID_ENVVAR) + if owner is None: + return False + + try: + owner_pid = int(owner) + if os.getppid() == 1 and owner_pid != os.getpid(): + self._debug('owner:skip-orphan', owner=owner, ppid=os.getppid()) + return True + if os.getppid() == owner_pid and owner_pid != os.getpid(): + try: + start_method = multiprocessing.get_start_method(allow_none=True) + except Exception: + start_method = None + if start_method == 'forkserver': + self._debug( + 'owner:skip-forkserver-child', + owner=owner, + ppid=os.getppid(), + start_method=start_method, + ) + return True + skip = owner_pid != os.getpid() + self._debug('owner:check', owner=owner, skip=skip) + return skip + except Exception: + return False + + def _has_forkserver_env(self): + for key in os.environ: + if key.startswith('FORKSERVER_'): + return True + if key.startswith('MULTIPROCESSING_FORKSERVER'): + return True + return False + + def _debug(self, message, **extra): + if not os.environ.get('LINE_PROFILER_DEBUG'): + return + try: + parent = multiprocessing.parent_process() + parent_pid = parent.pid if parent is not None else None + except Exception: + parent_pid = None + info = { + 'pid': os.getpid(), + 'ppid': os.getppid(), + 'process': getattr(multiprocessing.current_process(), 'name', None), + 'parent_pid': parent_pid, + 'owner_env': os.environ.get(_OWNER_PID_ENVVAR), + 'owner_pid': self._owner_pid, + 'enabled': self.enabled, + } + info.update(extra) + payload = ' '.join(f'{k}={v!r}' for k, v in info.items()) + print(f'[line_profiler debug] {message} {payload}') + def disable(self): """ Explicitly initialize and disable this global profiler. @@ -358,6 +502,14 @@ def show(self): If the implicit setup triggered, then this will be called by :py:mod:`atexit`. """ + self._debug('show:enter') + owner_env = os.environ.get(_OWNER_PID_ENVVAR) + if os.getppid() == 1 and owner_env == str(os.getpid()): + self._debug('show:skip-orphan-owner', owner_env=owner_env) + return + if self._owner_pid is not None and os.getpid() != self._owner_pid: + self._debug('show:skip-non-owner', current_pid=os.getpid()) + return import io import pathlib @@ -366,8 +518,8 @@ def show(self): write_timestamped_text = self.write_config['timestamped_text'] write_lprof = self.write_config['lprof'] + kwargs = {'config': self._config, **self.show_config} if write_stdout: - kwargs = {'config': self._config, **self.show_config} self._profile.print_stats(**kwargs) if write_text or write_timestamped_text: diff --git a/tests/test_complex_case.py b/tests/test_complex_case.py index bd705929..ace7502b 100644 --- a/tests/test_complex_case.py +++ b/tests/test_complex_case.py @@ -95,6 +95,8 @@ def test_varied_complex_invocations(): temp_dpath = stack.enter_context(tempfile.TemporaryDirectory()) stack.enter_context(ub.ChDir(temp_dpath)) env = {} + # Can enable if this breaks again + # env['LINE_PROFILER_DEBUG'] = '1' outpath = case['outpath'] if outpath: diff --git a/tests/test_explicit_profile.py b/tests/test_explicit_profile.py index 038e6582..511509b1 100644 --- a/tests/test_explicit_profile.py +++ b/tests/test_explicit_profile.py @@ -141,6 +141,88 @@ def test_explicit_profile_with_environ_on(): assert (temp_dpath / 'profile_output.lprof').exists() +def test_explicit_profile_ignores_inherited_owner_marker(): + """ + Standalone runs should not be blocked by an inherited owner marker. + """ + with tempfile.TemporaryDirectory() as tmp: + temp_dpath = ub.Path(tmp) + env = os.environ.copy() + env['LINE_PROFILE'] = '1' + env['LINE_PROFILER_OWNER_PID'] = str(os.getpid() + 100000) + env['PYTHONPATH'] = os.getcwd() + + with ub.ChDir(temp_dpath): + + script_fpath = ub.Path('script.py') + script_fpath.write_text(_demo_explicit_profile_script()) + + args = [sys.executable, os.fspath(script_fpath)] + proc = ub.cmd(args, env=env) + print(proc.stdout) + print(proc.stderr) + proc.check_returncode() + + assert (temp_dpath / 'profile_output.txt').exists() + assert (temp_dpath / 'profile_output.lprof').exists() + + +def test_explicit_profile_process_pool_forkserver(): + """ + Ensure explicit profiler works with forkserver ProcessPoolExecutor. + """ + import multiprocessing as mp + if 'forkserver' not in mp.get_all_start_methods(): + pytest.skip('forkserver start method not available') + with tempfile.TemporaryDirectory() as tmp: + temp_dpath = ub.Path(tmp) + env = os.environ.copy() + env['LINE_PROFILE'] = '1' + # env['LINE_PROFILER_DEBUG'] = '1' + env['PYTHONPATH'] = os.getcwd() + + with ub.ChDir(temp_dpath): + + script_fpath = ub.Path('script.py') + script_fpath.write_text(ub.codeblock( + ''' + import multiprocessing as mp + from concurrent.futures import ProcessPoolExecutor + from line_profiler import profile + + def worker(x): + return x * x + + @profile + def run(): + total = 0 + for i in range(1000): + total += i % 7 + with ProcessPoolExecutor(max_workers=2) as ex: + list(ex.map(worker, range(4))) + return total + + def main(): + if 'forkserver' in mp.get_all_start_methods(): + mp.set_start_method('forkserver', force=True) + run() + + if __name__ == '__main__': + main() + ''').strip()) + + args = [sys.executable, os.fspath(script_fpath)] + proc = ub.cmd(args, env=env) + print(proc.stdout) + print(proc.stderr) + proc.check_returncode() + + output_path = temp_dpath / 'profile_output.txt' + assert output_path.exists() + assert output_path.stat().st_size > 100 + assert proc.stdout.count('Wrote profile results to profile_output.txt') == 1 + + def test_explicit_profile_with_environ_off(): """ When LINE_PROFILE is falsy, profiling should not run.