Skip to content

Commit b28fc3e

Browse files
committed
[FLINK-38944][benchmark] Introduce a benchmark on adaptive partition selection for RescalePartitioner and RebalancePartitioner
1 parent 9ac3569 commit b28fc3e

2 files changed

Lines changed: 158 additions & 0 deletions

File tree

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.benchmark;
20+
21+
import org.apache.flink.benchmark.functions.LongSourceType;
22+
import org.apache.flink.benchmark.functions.MultiplyByTwo;
23+
import org.apache.flink.benchmark.functions.SkewableHigherMultiply;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
26+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
27+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
28+
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
29+
30+
import org.openjdk.jmh.annotations.Benchmark;
31+
import org.openjdk.jmh.annotations.OperationsPerInvocation;
32+
import org.openjdk.jmh.annotations.Param;
33+
import org.openjdk.jmh.runner.Runner;
34+
import org.openjdk.jmh.runner.RunnerException;
35+
import org.openjdk.jmh.runner.options.Options;
36+
import org.openjdk.jmh.runner.options.OptionsBuilder;
37+
import org.openjdk.jmh.runner.options.VerboseMode;
38+
39+
@OperationsPerInvocation(value = AdaptivePartitionThroughputBenchmark.RECORDS_PER_INVOCATION)
40+
public class AdaptivePartitionThroughputBenchmark extends BenchmarkBase {
41+
public static final int RECORDS_PER_INVOCATION = 150_000;
42+
private static final long CHECKPOINT_INTERVAL_MS = 100;
43+
44+
@Param({"F27_UNBOUNDED"})
45+
public LongSourceType sourceType;
46+
47+
@Param({"true", "false"})
48+
public boolean adaptivePartitioner;
49+
50+
@Param({"2"})
51+
public int adaptivePartitionerMaxTraverseSize;
52+
53+
public static void main(String[] args) throws RunnerException {
54+
Options options =
55+
new OptionsBuilder()
56+
.verbosity(VerboseMode.NORMAL)
57+
.include(
58+
".*"
59+
+ AdaptivePartitionThroughputBenchmark.class
60+
.getCanonicalName()
61+
+ ".*")
62+
.build();
63+
64+
new Runner(options).run();
65+
}
66+
67+
@Benchmark
68+
public void mapRebalanceMapRescaleMapSink(InputBenchmarkFlinkEnvironmentContext context)
69+
throws Exception {
70+
71+
StreamExecutionEnvironment env = context.env;
72+
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
73+
env.setParallelism(8);
74+
configAdaptivePartitioner(env);
75+
76+
DataStreamSource<Long> source = sourceType.source(env, RECORDS_PER_INVOCATION);
77+
source.map(new MultiplyByTwo())
78+
.rebalance()
79+
.map(new SkewableHigherMultiply(1, 0, 5))
80+
.rescale()
81+
.map(new SkewableHigherMultiply(1, 0, 5))
82+
.sinkTo(new DiscardingSink<>());
83+
84+
env.execute();
85+
}
86+
87+
private void configAdaptivePartitioner(StreamExecutionEnvironment env) {
88+
Configuration config = new Configuration();
89+
config.set(NettyShuffleEnvironmentOptions.ADAPTIVE_PARTITIONER_ENABLE, adaptivePartitioner);
90+
config.set(
91+
NettyShuffleEnvironmentOptions.ADAPTIVE_PARTITIONER_MAX_TRAVERSE_SIZE,
92+
adaptivePartitionerMaxTraverseSize);
93+
config.setString("restart-strategy", "fixed-delay");
94+
config.setString("restart-strategy.fixed-delay.attempts", "15000000");
95+
config.setString("restart-strategy.fixed-delay.delay", "3s");
96+
env.configure(config);
97+
}
98+
99+
public static class InputBenchmarkFlinkEnvironmentContext extends FlinkEnvironmentContext {
100+
101+
@Override
102+
protected int getNumberOfSlotsPerTaskManager() {
103+
return 16;
104+
}
105+
}
106+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.benchmark.functions;
20+
21+
import org.apache.flink.api.common.functions.RichMapFunction;
22+
23+
import java.util.Arrays;
24+
import java.util.HashSet;
25+
import java.util.Set;
26+
27+
public class SkewableHigherMultiply extends RichMapFunction<Long, Long> {
28+
29+
private final int additionalCostMillisPerRecord;
30+
private final Set<Integer> expectedSkewedSubTasksIndexes;
31+
32+
public SkewableHigherMultiply(
33+
int additionalCostMillisPerRecord, Integer... expectedSkewedSubTasksIndexes) {
34+
this.additionalCostMillisPerRecord = additionalCostMillisPerRecord;
35+
this.expectedSkewedSubTasksIndexes =
36+
new HashSet<>(Arrays.asList(expectedSkewedSubTasksIndexes));
37+
}
38+
39+
@Override
40+
public Long map(Long value) throws Exception {
41+
double base = value * 2.0d;
42+
if (additionalCostMillisPerRecord <= 0) {
43+
return (long) base;
44+
} else {
45+
if (expectedSkewedSubTasksIndexes.contains(
46+
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask())) {
47+
Thread.sleep(additionalCostMillisPerRecord);
48+
}
49+
return (long) base;
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)