diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5edb24a6eb3e8..ff887181e1a2e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2946,7 +2946,8 @@ package object config { private[spark] val DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED = ConfigBuilder("spark.driver.limitActiveProcessorCount.enabled") .doc("Whether to add -XX:ActiveProcessorCount= to the driver JVM " + - "options. Currently, this only takes effect in YARN cluster mode.") + "options. In YARN cluster mode, the count is set to . " + + "In local mode, it is applied at submit time via SparkSubmit.") .version("4.2.0") .booleanConf .createWithDefault(false) diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index 73336b55b89d4..74669a84bfe59 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -339,6 +339,32 @@ private List buildSparkSubmitCommand(Map env) checkJavaOptions(driverExtraJavaOptions); if (isClientMode) { + if (isLocalMode(config)) { + String limitAPC = config.get("spark.driver.limitActiveProcessorCount.enabled"); + if ("true".equalsIgnoreCase(limitAPC)) { + // Skip injection if the user already specified -XX:ActiveProcessorCount explicitly. + boolean alreadySet = + containsActiveProcessorCount(driverDefaultJavaOptions) || + containsActiveProcessorCount(driverExtraJavaOptions); + if (!alreadySet) { + int cores = 1; + String driverCoresStr = config.get("spark.driver.cores"); + if (driverCoresStr != null) { + try { + cores = Integer.parseInt(driverCoresStr.trim()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Invalid value for spark.driver.cores: '" + driverCoresStr + "'", e); + } + if (cores < 1) { + throw new IllegalArgumentException( + "spark.driver.cores must be >= 1, got: " + cores); + } + } + cmd.add("-XX:ActiveProcessorCount=" + cores); + } + } + } // Figuring out where the memory value come from is a little tricky due to precedence. // Precedence is observed in the following order: // - explicit configuration (setConf()), which also covers --driver-memory cli argument. @@ -487,6 +513,23 @@ boolean isClientMode(Map userProps) { return userMaster == null || userDeployMode == null || "client".equals(userDeployMode); } + private boolean isLocalMode(Map userProps) { + String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER)); + // null master defaults to local[*] (see isClientMode comment above); match "local" and + // "local[N]"/"local[*]" but not "local-cluster[...]" which runs separate worker processes. + return userMaster == null || + userMaster.trim().equals("local") || + userMaster.trim().startsWith("local["); + } + + private static boolean containsActiveProcessorCount(String javaOptions) { + if (isEmpty(javaOptions)) return false; + for (String opt : CommandBuilderUtils.parseOptionString(javaOptions)) { + if (opt.startsWith("-XX:ActiveProcessorCount=")) return true; + } + return false; + } + /** * Return whether the given main class represents a thrift server. */ @@ -564,6 +607,7 @@ protected boolean handle(String opt, String value) { case EXTRA_PROPERTIES_FILE -> extraPropertiesFiles.add(value); case LOAD_SPARK_DEFAULTS -> loadSparkDefaults = true; case DRIVER_MEMORY -> conf.put(SparkLauncher.DRIVER_MEMORY, value); + case DRIVER_CORES -> conf.put("spark.driver.cores", value); // no SparkLauncher constant case DRIVER_JAVA_OPTIONS -> conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value); case DRIVER_LIBRARY_PATH -> conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value); case DRIVER_DEFAULT_CLASS_PATH -> diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java index d1dba85a534f2..366a17bc63d9b 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java @@ -388,6 +388,98 @@ public void testIsClientMode() { assertTrue(builder.isClientMode(userProps)); } + @Test + public void testLimitActiveProcessorCountLocalMode() throws Exception { + // Flag disabled (default): no -XX:ActiveProcessorCount in command + List cmd = buildCommand( + parser.MASTER, "local[4]", + SparkLauncher.NO_RESOURCE); + assertFalse(cmd.stream().anyMatch(a -> a.startsWith("-XX:ActiveProcessorCount"))); + + // Flag explicitly disabled: no -XX:ActiveProcessorCount in command + cmd = buildCommand( + parser.MASTER, "local[4]", + parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=false", + SparkLauncher.NO_RESOURCE); + assertFalse(cmd.stream().anyMatch(a -> a.startsWith("-XX:ActiveProcessorCount"))); + + // Local mode (bare "local"), flag enabled, default cores (1) + cmd = buildCommand( + parser.MASTER, "local", + parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true", + SparkLauncher.NO_RESOURCE); + assertTrue(cmd.contains("-XX:ActiveProcessorCount=1")); + + // Local mode, flag enabled, default cores (1) + cmd = buildCommand( + parser.MASTER, "local[4]", + parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true", + SparkLauncher.NO_RESOURCE); + assertTrue(cmd.contains("-XX:ActiveProcessorCount=1")); + + // Local mode, flag enabled, custom cores via --conf spark.driver.cores=4 + cmd = buildCommand( + parser.MASTER, "local[4]", + parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true", + parser.CONF, "spark.driver.cores=4", + SparkLauncher.NO_RESOURCE); + assertTrue(cmd.contains("-XX:ActiveProcessorCount=4")); + + // Local mode, flag enabled, custom cores via --driver-cores + cmd = buildCommand( + parser.MASTER, "local[4]", + parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true", + parser.DRIVER_CORES, "3", + SparkLauncher.NO_RESOURCE); + assertTrue(cmd.contains("-XX:ActiveProcessorCount=3")); + + // YARN client mode, flag enabled: no -XX:ActiveProcessorCount in command + cmd = buildCommand( + parser.MASTER, "yarn", + parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true", + SparkLauncher.NO_RESOURCE); + assertFalse(cmd.stream().anyMatch(a -> a.startsWith("-XX:ActiveProcessorCount"))); + + // local-cluster mode, flag enabled: no -XX:ActiveProcessorCount in command + cmd = buildCommand( + parser.MASTER, "local-cluster[2,1,1024]", + parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true", + SparkLauncher.NO_RESOURCE); + assertFalse(cmd.stream().anyMatch(a -> a.startsWith("-XX:ActiveProcessorCount"))); + + // User already set -XX:ActiveProcessorCount in extraJavaOptions: skip auto-injection + cmd = buildCommand( + parser.MASTER, "local[4]", + parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true", + parser.DRIVER_JAVA_OPTIONS, "-XX:ActiveProcessorCount=8", + SparkLauncher.NO_RESOURCE); + assertTrue(cmd.contains("-XX:ActiveProcessorCount=8")); + assertEquals(1, cmd.stream().filter(a -> a.startsWith("-XX:ActiveProcessorCount")).count()); + + // User already set -XX:ActiveProcessorCount in defaultJavaOptions: skip auto-injection + cmd = buildCommand( + parser.MASTER, "local[4]", + parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true", + parser.CONF, SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS + "=-XX:ActiveProcessorCount=5", + SparkLauncher.NO_RESOURCE); + assertTrue(cmd.contains("-XX:ActiveProcessorCount=5")); + assertEquals(1, cmd.stream().filter(a -> a.startsWith("-XX:ActiveProcessorCount")).count()); + + // Invalid spark.driver.cores: should throw + assertThrows(IllegalArgumentException.class, () -> buildCommand( + parser.MASTER, "local[4]", + parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true", + parser.CONF, "spark.driver.cores=abc", + SparkLauncher.NO_RESOURCE)); + + // Zero cores: should throw + assertThrows(IllegalArgumentException.class, () -> buildCommand( + parser.MASTER, "local[4]", + parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true", + parser.CONF, "spark.driver.cores=0", + SparkLauncher.NO_RESOURCE)); + } + private void testCmdBuilder(boolean isDriver, File propertiesFile) throws Exception { final String DRIVER_DEFAULT_PARAM = "-Ddriver-default"; final String DRIVER_EXTRA_PARAM = "-Ddriver-extra"; @@ -530,6 +622,10 @@ private List buildCommand(List args, Map env) th return newCommandBuilder(args).buildCommand(env); } + private List buildCommand(String... args) throws Exception { + return buildCommand(Arrays.asList(args), new HashMap<>()); + } + private void testCLIOpts(String appResource, String opt, List params) throws Exception { List args = new ArrayList<>(); if (appResource != null) { @@ -543,5 +639,4 @@ private void testCLIOpts(String appResource, String opt, List params) th List cmd = buildCommand(args, env); assertTrue(cmd.contains(opt), opt + " should be contained in the final cmd."); } - }