Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2946,7 +2946,8 @@ package object config {
private[spark] val DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED =
ConfigBuilder("spark.driver.limitActiveProcessorCount.enabled")
.doc("Whether to add -XX:ActiveProcessorCount=<spark.driver.cores> to the driver JVM " +
"options. Currently, this only takes effect in YARN cluster mode.")
"options. In YARN cluster mode, the count is set to <spark.driver.cores>. " +
"In local mode, it is applied at submit time via SparkSubmit.")
.version("4.2.0")
.booleanConf
.createWithDefault(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,32 @@ private List<String> buildSparkSubmitCommand(Map<String, String> env)
checkJavaOptions(driverExtraJavaOptions);

if (isClientMode) {
if (isLocalMode(config)) {
String limitAPC = config.get("spark.driver.limitActiveProcessorCount.enabled");
if ("true".equalsIgnoreCase(limitAPC)) {
// Skip injection if the user already specified -XX:ActiveProcessorCount explicitly.
boolean alreadySet =
containsActiveProcessorCount(driverDefaultJavaOptions) ||
containsActiveProcessorCount(driverExtraJavaOptions);
if (!alreadySet) {
int cores = 1;
String driverCoresStr = config.get("spark.driver.cores");
if (driverCoresStr != null) {
try {
cores = Integer.parseInt(driverCoresStr.trim());
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Invalid value for spark.driver.cores: '" + driverCoresStr + "'", e);
}
if (cores < 1) {
throw new IllegalArgumentException(
"spark.driver.cores must be >= 1, got: " + cores);
}
}
cmd.add("-XX:ActiveProcessorCount=" + cores);
}
}
}
// Figuring out where the memory value come from is a little tricky due to precedence.
// Precedence is observed in the following order:
// - explicit configuration (setConf()), which also covers --driver-memory cli argument.
Expand Down Expand Up @@ -487,6 +513,23 @@ boolean isClientMode(Map<String, String> userProps) {
return userMaster == null || userDeployMode == null || "client".equals(userDeployMode);
}

private boolean isLocalMode(Map<String, String> userProps) {
String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER));
// null master defaults to local[*] (see isClientMode comment above); match "local" and
// "local[N]"/"local[*]" but not "local-cluster[...]" which runs separate worker processes.
return userMaster == null ||
userMaster.trim().equals("local") ||
userMaster.trim().startsWith("local[");
}

private static boolean containsActiveProcessorCount(String javaOptions) {
if (isEmpty(javaOptions)) return false;
for (String opt : CommandBuilderUtils.parseOptionString(javaOptions)) {
if (opt.startsWith("-XX:ActiveProcessorCount=")) return true;
}
return false;
}

/**
* Return whether the given main class represents a thrift server.
*/
Expand Down Expand Up @@ -564,6 +607,7 @@ protected boolean handle(String opt, String value) {
case EXTRA_PROPERTIES_FILE -> extraPropertiesFiles.add(value);
case LOAD_SPARK_DEFAULTS -> loadSparkDefaults = true;
case DRIVER_MEMORY -> conf.put(SparkLauncher.DRIVER_MEMORY, value);
case DRIVER_CORES -> conf.put("spark.driver.cores", value); // no SparkLauncher constant
case DRIVER_JAVA_OPTIONS -> conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
case DRIVER_LIBRARY_PATH -> conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
case DRIVER_DEFAULT_CLASS_PATH ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,98 @@ public void testIsClientMode() {
assertTrue(builder.isClientMode(userProps));
}

@Test
public void testLimitActiveProcessorCountLocalMode() throws Exception {
// Flag disabled (default): no -XX:ActiveProcessorCount in command
List<String> cmd = buildCommand(
parser.MASTER, "local[4]",
SparkLauncher.NO_RESOURCE);
assertFalse(cmd.stream().anyMatch(a -> a.startsWith("-XX:ActiveProcessorCount")));

// Flag explicitly disabled: no -XX:ActiveProcessorCount in command
cmd = buildCommand(
parser.MASTER, "local[4]",
parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=false",
SparkLauncher.NO_RESOURCE);
assertFalse(cmd.stream().anyMatch(a -> a.startsWith("-XX:ActiveProcessorCount")));

// Local mode (bare "local"), flag enabled, default cores (1)
cmd = buildCommand(
parser.MASTER, "local",
parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true",
SparkLauncher.NO_RESOURCE);
assertTrue(cmd.contains("-XX:ActiveProcessorCount=1"));

// Local mode, flag enabled, default cores (1)
cmd = buildCommand(
parser.MASTER, "local[4]",
parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true",
SparkLauncher.NO_RESOURCE);
assertTrue(cmd.contains("-XX:ActiveProcessorCount=1"));

// Local mode, flag enabled, custom cores via --conf spark.driver.cores=4
cmd = buildCommand(
parser.MASTER, "local[4]",
parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true",
parser.CONF, "spark.driver.cores=4",
SparkLauncher.NO_RESOURCE);
assertTrue(cmd.contains("-XX:ActiveProcessorCount=4"));

// Local mode, flag enabled, custom cores via --driver-cores
cmd = buildCommand(
parser.MASTER, "local[4]",
parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true",
parser.DRIVER_CORES, "3",
SparkLauncher.NO_RESOURCE);
assertTrue(cmd.contains("-XX:ActiveProcessorCount=3"));

// YARN client mode, flag enabled: no -XX:ActiveProcessorCount in command
cmd = buildCommand(
parser.MASTER, "yarn",
parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true",
SparkLauncher.NO_RESOURCE);
assertFalse(cmd.stream().anyMatch(a -> a.startsWith("-XX:ActiveProcessorCount")));

// local-cluster mode, flag enabled: no -XX:ActiveProcessorCount in command
cmd = buildCommand(
parser.MASTER, "local-cluster[2,1,1024]",
parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true",
SparkLauncher.NO_RESOURCE);
assertFalse(cmd.stream().anyMatch(a -> a.startsWith("-XX:ActiveProcessorCount")));

// User already set -XX:ActiveProcessorCount in extraJavaOptions: skip auto-injection
cmd = buildCommand(
parser.MASTER, "local[4]",
parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true",
parser.DRIVER_JAVA_OPTIONS, "-XX:ActiveProcessorCount=8",
SparkLauncher.NO_RESOURCE);
assertTrue(cmd.contains("-XX:ActiveProcessorCount=8"));
assertEquals(1, cmd.stream().filter(a -> a.startsWith("-XX:ActiveProcessorCount")).count());

// User already set -XX:ActiveProcessorCount in defaultJavaOptions: skip auto-injection
cmd = buildCommand(
parser.MASTER, "local[4]",
parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true",
parser.CONF, SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS + "=-XX:ActiveProcessorCount=5",
SparkLauncher.NO_RESOURCE);
assertTrue(cmd.contains("-XX:ActiveProcessorCount=5"));
assertEquals(1, cmd.stream().filter(a -> a.startsWith("-XX:ActiveProcessorCount")).count());

// Invalid spark.driver.cores: should throw
assertThrows(IllegalArgumentException.class, () -> buildCommand(
parser.MASTER, "local[4]",
parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true",
parser.CONF, "spark.driver.cores=abc",
SparkLauncher.NO_RESOURCE));

// Zero cores: should throw
assertThrows(IllegalArgumentException.class, () -> buildCommand(
parser.MASTER, "local[4]",
parser.CONF, "spark.driver.limitActiveProcessorCount.enabled=true",
parser.CONF, "spark.driver.cores=0",
SparkLauncher.NO_RESOURCE));
}

private void testCmdBuilder(boolean isDriver, File propertiesFile) throws Exception {
final String DRIVER_DEFAULT_PARAM = "-Ddriver-default";
final String DRIVER_EXTRA_PARAM = "-Ddriver-extra";
Expand Down Expand Up @@ -530,6 +622,10 @@ private List<String> buildCommand(List<String> args, Map<String, String> env) th
return newCommandBuilder(args).buildCommand(env);
}

private List<String> buildCommand(String... args) throws Exception {
return buildCommand(Arrays.asList(args), new HashMap<>());
}

private void testCLIOpts(String appResource, String opt, List<String> params) throws Exception {
List<String> args = new ArrayList<>();
if (appResource != null) {
Expand All @@ -543,5 +639,4 @@ private void testCLIOpts(String appResource, String opt, List<String> params) th
List<String> cmd = buildCommand(args, env);
assertTrue(cmd.contains(opt), opt + " should be contained in the final cmd.");
}

}