-
Notifications
You must be signed in to change notification settings - Fork 581
optimize(api): add warning logs for load-based request rejection #2972
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -25,6 +25,8 @@ | |||||||||||||||||||||
| import org.apache.hugegraph.define.WorkLoad; | ||||||||||||||||||||||
| import org.apache.hugegraph.util.Bytes; | ||||||||||||||||||||||
| import org.apache.hugegraph.util.E; | ||||||||||||||||||||||
| import org.apache.hugegraph.util.Log; | ||||||||||||||||||||||
| import org.slf4j.Logger; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| import com.google.common.collect.ImmutableSet; | ||||||||||||||||||||||
| import com.google.common.util.concurrent.RateLimiter; | ||||||||||||||||||||||
|
|
@@ -43,6 +45,8 @@ | |||||||||||||||||||||
| @PreMatching | ||||||||||||||||||||||
| public class LoadDetectFilter implements ContainerRequestFilter { | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private static final Logger LOG = Log.logger(LoadDetectFilter.class); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private static final Set<String> WHITE_API_LIST = ImmutableSet.of( | ||||||||||||||||||||||
| "", | ||||||||||||||||||||||
| "apis", | ||||||||||||||||||||||
|
|
@@ -59,6 +63,20 @@ public class LoadDetectFilter implements ContainerRequestFilter { | |||||||||||||||||||||
| @Context | ||||||||||||||||||||||
| private jakarta.inject.Provider<WorkLoad> loadProvider; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| public static boolean isWhiteAPI(ContainerRequestContext context) { | ||||||||||||||||||||||
| List<PathSegment> segments = context.getUriInfo().getPathSegments(); | ||||||||||||||||||||||
| E.checkArgument(!segments.isEmpty(), "Invalid request uri '%s'", | ||||||||||||||||||||||
| context.getUriInfo().getPath()); | ||||||||||||||||||||||
| String rootPath = segments.get(0).getPath(); | ||||||||||||||||||||||
| return WHITE_API_LIST.contains(rootPath); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private static void gcIfNeeded() { | ||||||||||||||||||||||
| if (GC_RATE_LIMITER.tryAcquire(1)) { | ||||||||||||||||||||||
| System.gc(); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| @Override | ||||||||||||||||||||||
| public void filter(ContainerRequestContext context) { | ||||||||||||||||||||||
| if (LoadDetectFilter.isWhiteAPI(context)) { | ||||||||||||||||||||||
|
|
@@ -70,7 +88,12 @@ public void filter(ContainerRequestContext context) { | |||||||||||||||||||||
| int maxWorkerThreads = config.get(ServerOptions.MAX_WORKER_THREADS); | ||||||||||||||||||||||
| WorkLoad load = this.loadProvider.get(); | ||||||||||||||||||||||
| // There will be a thread doesn't work, dedicated to statistics | ||||||||||||||||||||||
| if (load.incrementAndGet() >= maxWorkerThreads) { | ||||||||||||||||||||||
| int currentLoad = load.incrementAndGet(); | ||||||||||||||||||||||
| if (currentLoad >= maxWorkerThreads) { | ||||||||||||||||||||||
| LOG.warn("Rejected request due to high worker load, method={}, path={}, " + | ||||||||||||||||||||||
| "currentLoad={}, maxWorkerThreads={}", | ||||||||||||||||||||||
| context.getMethod(), context.getUriInfo().getPath(), | ||||||||||||||||||||||
| currentLoad, maxWorkerThreads); | ||||||||||||||||||||||
| throw new ServiceUnavailableException(String.format( | ||||||||||||||||||||||
| "The server is too busy to process the request, " + | ||||||||||||||||||||||
| "you can config %s to adjust it or try again later", | ||||||||||||||||||||||
|
|
@@ -84,6 +107,10 @@ public void filter(ContainerRequestContext context) { | |||||||||||||||||||||
| allocatedMem) / Bytes.MB; | ||||||||||||||||||||||
| if (presumableFreeMem < minFreeMemory) { | ||||||||||||||||||||||
| gcIfNeeded(); | ||||||||||||||||||||||
| LOG.warn("Rejected request due to low free memory, method={}, path={}, " + | ||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
建议同时输出 GC 前/后快照,定位会更直观:
Suggested change
|
||||||||||||||||||||||
| "presumableFreeMemMB={}, minFreeMemoryMB={}", | ||||||||||||||||||||||
| context.getMethod(), context.getUriInfo().getPath(), | ||||||||||||||||||||||
| presumableFreeMem, minFreeMemory); | ||||||||||||||||||||||
| throw new ServiceUnavailableException(String.format( | ||||||||||||||||||||||
| "The server available memory %s(MB) is below than " + | ||||||||||||||||||||||
| "threshold %s(MB) and can't process the request, " + | ||||||||||||||||||||||
|
|
@@ -92,18 +119,4 @@ public void filter(ContainerRequestContext context) { | |||||||||||||||||||||
| ServerOptions.MIN_FREE_MEMORY.name())); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| public static boolean isWhiteAPI(ContainerRequestContext context) { | ||||||||||||||||||||||
| List<PathSegment> segments = context.getUriInfo().getPathSegments(); | ||||||||||||||||||||||
| E.checkArgument(!segments.isEmpty(), "Invalid request uri '%s'", | ||||||||||||||||||||||
| context.getUriInfo().getPath()); | ||||||||||||||||||||||
| String rootPath = segments.get(0).getPath(); | ||||||||||||||||||||||
| return WHITE_API_LIST.contains(rootPath); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private static void gcIfNeeded() { | ||||||||||||||||||||||
| if (GC_RATE_LIMITER.tryAcquire(1)) { | ||||||||||||||||||||||
| System.gc(); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,151 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hugegraph.unit.api.filter; | ||
|
|
||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import org.apache.commons.configuration2.Configuration; | ||
| import org.apache.commons.configuration2.PropertiesConfiguration; | ||
| import org.apache.hugegraph.api.filter.LoadDetectFilter; | ||
| import org.apache.hugegraph.config.HugeConfig; | ||
| import org.apache.hugegraph.config.ServerOptions; | ||
| import org.apache.hugegraph.define.WorkLoad; | ||
| import org.apache.hugegraph.testutil.Assert; | ||
| import org.apache.hugegraph.unit.BaseUnitTest; | ||
| import org.junit.Before; | ||
| import org.junit.Test; | ||
| import org.mockito.Mockito; | ||
|
|
||
| import jakarta.inject.Provider; | ||
| import jakarta.ws.rs.ServiceUnavailableException; | ||
| import jakarta.ws.rs.container.ContainerRequestContext; | ||
| import jakarta.ws.rs.core.PathSegment; | ||
| import jakarta.ws.rs.core.UriInfo; | ||
|
|
||
| public class LoadDetectFilterTest extends BaseUnitTest { | ||
|
|
||
| private LoadDetectFilter loadDetectFilter; | ||
| private ContainerRequestContext requestContext; | ||
| private UriInfo uriInfo; | ||
| private WorkLoad workLoad; | ||
|
|
||
| @Before | ||
| public void setup() { | ||
| this.requestContext = Mockito.mock(ContainerRequestContext.class); | ||
| this.uriInfo = Mockito.mock(UriInfo.class); | ||
| this.workLoad = new WorkLoad(); | ||
|
|
||
| Mockito.when(this.requestContext.getUriInfo()).thenReturn(this.uriInfo); | ||
| Mockito.when(this.requestContext.getMethod()).thenReturn("GET"); | ||
|
|
||
| this.loadDetectFilter = new LoadDetectFilter(); | ||
| injectProvider(this.loadDetectFilter, "loadProvider", () -> this.workLoad); | ||
| injectProvider(this.loadDetectFilter, "configProvider", | ||
| () -> createConfig(8, 0)); | ||
| } | ||
|
|
||
| @Test | ||
| public void testFilter_WhiteListPathIgnored() { | ||
| setupPath("", List.of("")); | ||
| this.workLoad.incrementAndGet(); | ||
|
|
||
| this.loadDetectFilter.filter(this.requestContext); | ||
|
|
||
| Assert.assertEquals(1, this.workLoad.get().get()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testFilter_RejectsWhenWorkerLoadIsTooHigh() { | ||
| setupPath("graphs/hugegraph/vertices", | ||
| List.of("graphs", "hugegraph", "vertices")); | ||
| injectProvider(this.loadDetectFilter, "configProvider", | ||
| () -> createConfig(2, 0)); | ||
| this.workLoad.incrementAndGet(); | ||
|
|
||
| ServiceUnavailableException exception = (ServiceUnavailableException) Assert.assertThrows( | ||
| ServiceUnavailableException.class, | ||
| () -> this.loadDetectFilter.filter(this.requestContext)); | ||
|
|
||
| Assert.assertContains("The server is too busy to process the request", | ||
| exception.getMessage()); | ||
| Assert.assertContains(ServerOptions.MAX_WORKER_THREADS.name(), | ||
| exception.getMessage()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testFilter_RejectsWhenFreeMemoryIsTooLow() { | ||
| setupPath("graphs/hugegraph/vertices", | ||
| List.of("graphs", "hugegraph", "vertices")); | ||
| injectProvider(this.loadDetectFilter, "configProvider", | ||
| () -> createConfig(8, Integer.MAX_VALUE)); | ||
|
|
||
| ServiceUnavailableException exception = (ServiceUnavailableException) Assert.assertThrows( | ||
| ServiceUnavailableException.class, | ||
| () -> this.loadDetectFilter.filter(this.requestContext)); | ||
|
|
||
| Assert.assertContains("The server available memory", | ||
| exception.getMessage()); | ||
| Assert.assertContains(ServerOptions.MIN_FREE_MEMORY.name(), | ||
| exception.getMessage()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testFilter_AllowsRequestWhenLoadAndMemoryAreHealthy() { | ||
| setupPath("graphs/hugegraph/vertices", | ||
| List.of("graphs", "hugegraph", "vertices")); | ||
| injectProvider(this.loadDetectFilter, "configProvider", | ||
| () -> createConfig(8, 0)); | ||
|
|
||
| this.loadDetectFilter.filter(this.requestContext); | ||
|
|
||
| Assert.assertEquals(1, this.workLoad.get().get()); | ||
| } | ||
|
|
||
| private HugeConfig createConfig(int maxWorkerThreads, int minFreeMemory) { | ||
| Configuration conf = new PropertiesConfiguration(); | ||
| conf.setProperty(ServerOptions.MAX_WORKER_THREADS.name(), maxWorkerThreads); | ||
| conf.setProperty(ServerOptions.MIN_FREE_MEMORY.name(), minFreeMemory); | ||
| return new HugeConfig(conf); | ||
| } | ||
|
|
||
| private void setupPath(String path, List<String> segments) { | ||
| List<PathSegment> pathSegments = segments.stream() | ||
| .map(this::createPathSegment) | ||
| .collect(Collectors.toList()); | ||
| Mockito.when(this.uriInfo.getPath()).thenReturn(path); | ||
| Mockito.when(this.uriInfo.getPathSegments()).thenReturn(pathSegments); | ||
| } | ||
|
|
||
| private PathSegment createPathSegment(String path) { | ||
| PathSegment segment = Mockito.mock(PathSegment.class); | ||
| Mockito.when(segment.getPath()).thenReturn(path); | ||
| return segment; | ||
| } | ||
|
|
||
| private <T> void injectProvider(LoadDetectFilter filter, String fieldName, | ||
| Provider<T> provider) { | ||
| try { | ||
| java.lang.reflect.Field field = LoadDetectFilter.class.getDeclaredField(fieldName); | ||
| field.setAccessible(true); | ||
| field.set(filter, provider); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Failed to inject provider: " + fieldName, e); | ||
| } | ||
|
Comment on lines
+141
to
+149
|
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new WARN logs will be emitted for every rejected request; under overload this can produce a log storm and further degrade the server (I/O, disk, log rotation). Consider rate-limiting/sampling these rejection logs (similar to the existing RateLimiter usage elsewhere) or logging once per interval with aggregated counters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WARN,可能产生大量日志 I/O,进一步放大系统抖动(导入/批量写入期间更明显)。可以给“拒绝日志”加一个轻量限速(只限日志,不限请求处理),这样不会阻塞线程:
// 每秒最多 1 条日志记录
参考:
private static final RateLimiter REJECT_LOG_RATE_LIMITER = RateLimiter.create(1.0);