diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/LoadDetectFilter.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/LoadDetectFilter.java index 7ee5e7c0f7..a2e2ad9ddf 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/LoadDetectFilter.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/LoadDetectFilter.java @@ -25,6 +25,8 @@ import org.apache.hugegraph.define.WorkLoad; import org.apache.hugegraph.util.Bytes; import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.RateLimiter; @@ -43,6 +45,8 @@ @PreMatching public class LoadDetectFilter implements ContainerRequestFilter { + private static final Logger LOG = Log.logger(LoadDetectFilter.class); + private static final Set WHITE_API_LIST = ImmutableSet.of( "", "apis", @@ -59,6 +63,20 @@ public class LoadDetectFilter implements ContainerRequestFilter { @Context private jakarta.inject.Provider loadProvider; + public static boolean isWhiteAPI(ContainerRequestContext context) { + List segments = context.getUriInfo().getPathSegments(); + E.checkArgument(!segments.isEmpty(), "Invalid request uri '%s'", + context.getUriInfo().getPath()); + String rootPath = segments.get(0).getPath(); + return WHITE_API_LIST.contains(rootPath); + } + + private static void gcIfNeeded() { + if (GC_RATE_LIMITER.tryAcquire(1)) { + System.gc(); + } + } + @Override public void filter(ContainerRequestContext context) { if (LoadDetectFilter.isWhiteAPI(context)) { @@ -70,7 +88,12 @@ public void filter(ContainerRequestContext context) { int maxWorkerThreads = config.get(ServerOptions.MAX_WORKER_THREADS); WorkLoad load = this.loadProvider.get(); // There will be a thread doesn't work, dedicated to statistics - if (load.incrementAndGet() >= maxWorkerThreads) { + int currentLoad = load.incrementAndGet(); + if (currentLoad >= maxWorkerThreads) { + LOG.warn("Rejected request due to high worker load, method={}, path={}, " + + "currentLoad={}, maxWorkerThreads={}", + context.getMethod(), context.getUriInfo().getPath(), + currentLoad, maxWorkerThreads); throw new ServiceUnavailableException(String.format( "The server is too busy to process the request, " + "you can config %s to adjust it or try again later", @@ -84,6 +107,10 @@ public void filter(ContainerRequestContext context) { allocatedMem) / Bytes.MB; if (presumableFreeMem < minFreeMemory) { gcIfNeeded(); + LOG.warn("Rejected request due to low free memory, method={}, path={}, " + + "presumableFreeMemMB={}, minFreeMemoryMB={}", + context.getMethod(), context.getUriInfo().getPath(), + presumableFreeMem, minFreeMemory); throw new ServiceUnavailableException(String.format( "The server available memory %s(MB) is below than " + "threshold %s(MB) and can't process the request, " + @@ -92,18 +119,4 @@ public void filter(ContainerRequestContext context) { ServerOptions.MIN_FREE_MEMORY.name())); } } - - public static boolean isWhiteAPI(ContainerRequestContext context) { - List segments = context.getUriInfo().getPathSegments(); - E.checkArgument(!segments.isEmpty(), "Invalid request uri '%s'", - context.getUriInfo().getPath()); - String rootPath = segments.get(0).getPath(); - return WHITE_API_LIST.contains(rootPath); - } - - private static void gcIfNeeded() { - if (GC_RATE_LIMITER.tryAcquire(1)) { - System.gc(); - } - } } diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java index a0cb72aa6c..396c081889 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java @@ -18,6 +18,7 @@ package org.apache.hugegraph.unit; import org.apache.hugegraph.core.RoleElectionStateMachineTest; +import org.apache.hugegraph.unit.api.filter.LoadDetectFilterTest; import org.apache.hugegraph.unit.api.filter.PathFilterTest; import org.apache.hugegraph.unit.cache.CacheManagerTest; import org.apache.hugegraph.unit.cache.CacheTest; @@ -78,6 +79,7 @@ @RunWith(Suite.class) @Suite.SuiteClasses({ /* api filter */ + LoadDetectFilterTest.class, PathFilterTest.class, /* cache */ diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/api/filter/LoadDetectFilterTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/api/filter/LoadDetectFilterTest.java new file mode 100644 index 0000000000..f6a182afe3 --- /dev/null +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/api/filter/LoadDetectFilterTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.unit.api.filter; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.configuration2.Configuration; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.hugegraph.api.filter.LoadDetectFilter; +import org.apache.hugegraph.config.HugeConfig; +import org.apache.hugegraph.config.ServerOptions; +import org.apache.hugegraph.define.WorkLoad; +import org.apache.hugegraph.testutil.Assert; +import org.apache.hugegraph.unit.BaseUnitTest; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import jakarta.inject.Provider; +import jakarta.ws.rs.ServiceUnavailableException; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.PathSegment; +import jakarta.ws.rs.core.UriInfo; + +public class LoadDetectFilterTest extends BaseUnitTest { + + private LoadDetectFilter loadDetectFilter; + private ContainerRequestContext requestContext; + private UriInfo uriInfo; + private WorkLoad workLoad; + + @Before + public void setup() { + this.requestContext = Mockito.mock(ContainerRequestContext.class); + this.uriInfo = Mockito.mock(UriInfo.class); + this.workLoad = new WorkLoad(); + + Mockito.when(this.requestContext.getUriInfo()).thenReturn(this.uriInfo); + Mockito.when(this.requestContext.getMethod()).thenReturn("GET"); + + this.loadDetectFilter = new LoadDetectFilter(); + injectProvider(this.loadDetectFilter, "loadProvider", () -> this.workLoad); + injectProvider(this.loadDetectFilter, "configProvider", + () -> createConfig(8, 0)); + } + + @Test + public void testFilter_WhiteListPathIgnored() { + setupPath("", List.of("")); + this.workLoad.incrementAndGet(); + + this.loadDetectFilter.filter(this.requestContext); + + Assert.assertEquals(1, this.workLoad.get().get()); + } + + @Test + public void testFilter_RejectsWhenWorkerLoadIsTooHigh() { + setupPath("graphs/hugegraph/vertices", + List.of("graphs", "hugegraph", "vertices")); + injectProvider(this.loadDetectFilter, "configProvider", + () -> createConfig(2, 0)); + this.workLoad.incrementAndGet(); + + ServiceUnavailableException exception = (ServiceUnavailableException) Assert.assertThrows( + ServiceUnavailableException.class, + () -> this.loadDetectFilter.filter(this.requestContext)); + + Assert.assertContains("The server is too busy to process the request", + exception.getMessage()); + Assert.assertContains(ServerOptions.MAX_WORKER_THREADS.name(), + exception.getMessage()); + } + + @Test + public void testFilter_RejectsWhenFreeMemoryIsTooLow() { + setupPath("graphs/hugegraph/vertices", + List.of("graphs", "hugegraph", "vertices")); + injectProvider(this.loadDetectFilter, "configProvider", + () -> createConfig(8, Integer.MAX_VALUE)); + + ServiceUnavailableException exception = (ServiceUnavailableException) Assert.assertThrows( + ServiceUnavailableException.class, + () -> this.loadDetectFilter.filter(this.requestContext)); + + Assert.assertContains("The server available memory", + exception.getMessage()); + Assert.assertContains(ServerOptions.MIN_FREE_MEMORY.name(), + exception.getMessage()); + } + + @Test + public void testFilter_AllowsRequestWhenLoadAndMemoryAreHealthy() { + setupPath("graphs/hugegraph/vertices", + List.of("graphs", "hugegraph", "vertices")); + injectProvider(this.loadDetectFilter, "configProvider", + () -> createConfig(8, 0)); + + this.loadDetectFilter.filter(this.requestContext); + + Assert.assertEquals(1, this.workLoad.get().get()); + } + + private HugeConfig createConfig(int maxWorkerThreads, int minFreeMemory) { + Configuration conf = new PropertiesConfiguration(); + conf.setProperty(ServerOptions.MAX_WORKER_THREADS.name(), maxWorkerThreads); + conf.setProperty(ServerOptions.MIN_FREE_MEMORY.name(), minFreeMemory); + return new HugeConfig(conf); + } + + private void setupPath(String path, List segments) { + List pathSegments = segments.stream() + .map(this::createPathSegment) + .collect(Collectors.toList()); + Mockito.when(this.uriInfo.getPath()).thenReturn(path); + Mockito.when(this.uriInfo.getPathSegments()).thenReturn(pathSegments); + } + + private PathSegment createPathSegment(String path) { + PathSegment segment = Mockito.mock(PathSegment.class); + Mockito.when(segment.getPath()).thenReturn(path); + return segment; + } + + private void injectProvider(LoadDetectFilter filter, String fieldName, + Provider provider) { + try { + java.lang.reflect.Field field = LoadDetectFilter.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(filter, provider); + } catch (Exception e) { + throw new RuntimeException("Failed to inject provider: " + fieldName, e); + } + } +}