From de6f20664ee533faaf2bf1b6e5f13693530ecf70 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Sat, 27 Dec 2025 19:07:30 +0100 Subject: [PATCH] Cap total number of concurrent requests per HTTP/2 connection --- .../bootstrap/H2MultiplexingRequester.java | 115 ++++++--- .../H2MultiplexingRequesterBootstrap.java | 21 +- .../ReleasingAsyncClientExchangeHandler.java | 118 +++++++++ .../H2RequestsPerConnectionCapExample.java | 243 ++++++++++++++++++ .../H2RequestsPerConnectionCapTest.java | 145 +++++++++++ 5 files changed, 611 insertions(+), 31 deletions(-) create mode 100644 httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/ReleasingAsyncClientExchangeHandler.java create mode 100644 httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2RequestsPerConnectionCapExample.java create mode 100644 httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequestsPerConnectionCapTest.java diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java index f03bd20577..6c2f7b306a 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java @@ -32,7 +32,11 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.concurrent.Cancellable; @@ -43,7 +47,6 @@ import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.function.Decorator; import org.apache.hc.core5.function.Resolver; -import org.apache.hc.core5.http.ConnectionClosedException; import org.apache.hc.core5.http.EntityDetails; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpException; @@ -86,6 +89,8 @@ public class H2MultiplexingRequester extends AsyncRequester { private final H2ConnPool connPool; + private final ConcurrentMap inFlightPerSession; + private final int maxRequestsPerConnection; /** * Use {@link H2MultiplexingRequesterBootstrap} to create instances of this class. @@ -100,11 +105,44 @@ public H2MultiplexingRequester( final Resolver addressResolver, final TlsStrategy tlsStrategy, final IOReactorMetricsListener threadPoolListener, - final IOWorkerSelector workerSelector) { + final IOWorkerSelector workerSelector, + final int maxRequestsPerConnection) { super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener, workerSelector); this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy); + this.inFlightPerSession = new ConcurrentHashMap<>(); + this.maxRequestsPerConnection = maxRequestsPerConnection; + } + + private boolean tryAcquireSlot(final IOSession ioSession, final int max) { + if (max <= 0) { + return true; + } + final AtomicInteger counter = inFlightPerSession.computeIfAbsent(ioSession, s -> new AtomicInteger(0)); + for (;;) { + final int q = counter.get(); + if (q >= max) { + return false; + } + if (counter.compareAndSet(q, q + 1)) { + return true; + } + } + } + + private void releaseSlot(final IOSession ioSession, final int max) { + if (max <= 0) { + return; + } + final AtomicInteger counter = inFlightPerSession.get(ioSession); + if (counter == null) { + return; + } + final int q = counter.decrementAndGet(); + if (q <= 0) { + inFlightPerSession.remove(ioSession, counter); + } } public void closeIdle(final TimeValue idleTime) { @@ -182,15 +220,29 @@ private void execute( if (request.getAuthority() == null) { request.setAuthority(new URIAuthority(host)); } + if (request.getScheme() == null) { + request.setScheme(host.getSchemeName()); + } connPool.getSession(host, timeout, new FutureCallback() { @Override public void completed(final IOSession ioSession) { + if (!tryAcquireSlot(ioSession, maxRequestsPerConnection)) { + exchangeHandler.failed(new RejectedExecutionException( + "Maximum number of concurrent requests per connection reached (max=" + maxRequestsPerConnection + ")")); + exchangeHandler.releaseResources(); + return; + } + + final AsyncClientExchangeHandler actual = maxRequestsPerConnection > 0 + ? new ReleasingAsyncClientExchangeHandler(exchangeHandler, () -> releaseSlot(ioSession, maxRequestsPerConnection)) + : exchangeHandler; + final AsyncClientExchangeHandler handlerProxy = new AsyncClientExchangeHandler() { @Override public void releaseResources() { - exchangeHandler.releaseResources(); + actual.releaseResources(); } @Override @@ -199,67 +251,70 @@ public void produceRequest(final RequestChannel channel, final HttpContext httpC } @Override - public int available() { - return exchangeHandler.available(); + public void consumeResponse( + final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException { + actual.consumeResponse(response, entityDetails, httpContext); } @Override - public void produce(final DataStreamChannel channel) throws IOException { - exchangeHandler.produce(channel); + public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { + actual.consumeInformation(response, httpContext); } @Override - public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { - exchangeHandler.consumeInformation(response, httpContext); + public int available() { + return actual.available(); } @Override - public void consumeResponse( - final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException { - exchangeHandler.consumeResponse(response, entityDetails, httpContext); + public void produce(final DataStreamChannel channel) throws IOException { + actual.produce(channel); } @Override public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { - exchangeHandler.updateCapacity(capacityChannel); + actual.updateCapacity(capacityChannel); } @Override public void consume(final ByteBuffer src) throws IOException { - exchangeHandler.consume(src); + actual.consume(src); } @Override public void streamEnd(final List trailers) throws HttpException, IOException { - exchangeHandler.streamEnd(trailers); + actual.streamEnd(trailers); } @Override public void cancel() { - exchangeHandler.cancel(); + actual.cancel(); } @Override public void failed(final Exception cause) { - exchangeHandler.failed(cause); + actual.failed(cause); } }; final Timeout socketTimeout = ioSession.getSocketTimeout(); - ioSession.enqueue(new RequestExecutionCommand( - handlerProxy, - pushHandlerFactory, - context, - streamControl -> { - cancellableDependency.setDependency(streamControl); - if (socketTimeout != null) { - streamControl.setTimeout(socketTimeout); - } - }), - Command.Priority.NORMAL); - if (!ioSession.isOpen()) { - exchangeHandler.failed(new ConnectionClosedException()); + try { + ioSession.enqueue(new RequestExecutionCommand( + handlerProxy, + pushHandlerFactory, + context, + streamControl -> { + cancellableDependency.setDependency(streamControl); + if (socketTimeout != null) { + streamControl.setTimeout(socketTimeout); + } + }), + Command.Priority.NORMAL); + } catch (final RuntimeException ex) { + actual.failed(ex); + actual.releaseResources(); } + } @Override diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java index a19e7913fc..801fb4f0ac 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hc.core5.annotation.Experimental; import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.function.Decorator; import org.apache.hc.core5.function.Supplier; @@ -76,6 +77,8 @@ public class H2MultiplexingRequesterBootstrap { private IOReactorMetricsListener threadPoolListener; + private int maxRequestsPerConnection; + private H2MultiplexingRequesterBootstrap() { this.routeEntries = new ArrayList<>(); } @@ -180,6 +183,21 @@ public final H2MultiplexingRequesterBootstrap setIOReactorMetricsListener(final return this; } + /** + * Sets a hard cap on the number of requests allowed to be queued / in-flight per connection. + * When the limit is reached, new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}. + * A value {@code <= 0} means unlimited (default). + * + * @param max maximum number of requests per connection; {@code <= 0} to disable the cap + * @return this instance. + * @since 5.5 + */ + @Experimental + public final H2MultiplexingRequesterBootstrap setMaxRequestsPerConnection(final int max) { + this.maxRequestsPerConnection = max; + return this; + } + /** * Sets {@link H2StreamListener} instance. * @@ -274,7 +292,8 @@ public H2MultiplexingRequester create() { DefaultAddressResolver.INSTANCE, tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(), threadPoolListener, - null); + null, + maxRequestsPerConnection); } } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/ReleasingAsyncClientExchangeHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/ReleasingAsyncClientExchangeHandler.java new file mode 100644 index 0000000000..a4a18194c7 --- /dev/null +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/ReleasingAsyncClientExchangeHandler.java @@ -0,0 +1,118 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.impl.nio.bootstrap; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.protocol.HttpContext; + +final class ReleasingAsyncClientExchangeHandler implements AsyncClientExchangeHandler { + + private final AsyncClientExchangeHandler exchangeHandler; + private final Runnable onRelease; + private final AtomicBoolean released; + + ReleasingAsyncClientExchangeHandler(final AsyncClientExchangeHandler exchangeHandler, final Runnable onRelease) { + this.exchangeHandler = exchangeHandler; + this.onRelease = onRelease; + this.released = new AtomicBoolean(false); + } + + @Override + public void produceRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException { + exchangeHandler.produceRequest(channel, context); + } + + @Override + public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) + throws HttpException, IOException { + exchangeHandler.consumeResponse(response, entityDetails, context); + } + + @Override + public void consumeInformation(final HttpResponse response, final HttpContext context) throws HttpException, IOException { + exchangeHandler.consumeInformation(response, context); + } + + @Override + public int available() { + return exchangeHandler.available(); + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + exchangeHandler.produce(channel); + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + exchangeHandler.updateCapacity(capacityChannel); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + exchangeHandler.consume(src); + } + + @Override + public void streamEnd(final List trailers) throws HttpException, IOException { + exchangeHandler.streamEnd(trailers); + } + + @Override + public void failed(final Exception cause) { + exchangeHandler.failed(cause); + } + + @Override + public void cancel() { + exchangeHandler.cancel(); + } + + @Override + public void releaseResources() { + try { + exchangeHandler.releaseResources(); + } finally { + if (released.compareAndSet(false, true)) { + onRelease.run(); + } + } + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2RequestsPerConnectionCapExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2RequestsPerConnectionCapExample.java new file mode 100644 index 0000000000..c797cf9af9 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2RequestsPerConnectionCapExample.java @@ -0,0 +1,243 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.examples; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hc.core5.annotation.Experimental; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncRequestProducer; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.util.Timeout; + +/** + * Validates a per-connection "max requests in flight / queued" cap for HTTP/2. + *

+ * Expected behaviour (cap=1): + * - First slow request executes and holds the slot. + * - Second overlapping request fails fast with RejectedExecutionException. + * - After the first completes, the second can be re-submitted and succeeds. + * + * @since 5.5 + */ +@Experimental +public class H2RequestsPerConnectionCapExample { + + public static void main(final String[] args) throws Exception { + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .build(); + + final int maxRequestsPerConnection = 1; + + final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap() + .setH2Config(h2Config) + .setMaxRequestsPerConnection(maxRequestsPerConnection) + .create(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("HTTP requester shutting down"); + requester.close(CloseMode.GRACEFUL); + })); + + requester.start(); + + final HttpHost target = new HttpHost("https", "nghttp2.org"); + + // Warm-up to get a session established in the pool. + final ClassicHttpRequest warmup = ClassicRequestBuilder.get() + .setHttpHost(target) + .setPath("/httpbin/ip") + .build(); + + System.out.println("Warm-up: " + warmup); + executeBlocking(requester, target, warmup, Timeout.ofMinutes(1)); + + // Slow request to hold the slot + final ClassicHttpRequest r1 = ClassicRequestBuilder.get() + .setHttpHost(target) + .setPath("/httpbin/delay/5?i=0") + .build(); + + // Overlapping request expected to be rejected fast + final ClassicHttpRequest r2 = ClassicRequestBuilder.get() + .setHttpHost(target) + .setPath("/httpbin/delay/5?i=1") + .build(); + + final CountDownLatch r1Started = new CountDownLatch(1); + final CountDownLatch r1Done = new CountDownLatch(1); + + final Thread t1 = new Thread(() -> { + try { + r1Started.countDown(); + System.out.println("Submitting r1 (should execute): " + r1); + executeBlocking(requester, target, r1, Timeout.ofMinutes(2)); + System.out.println("r1 completed"); + } catch (final Exception ex) { + System.out.println("r1 failed: " + ex); + } finally { + r1Done.countDown(); + } + }, "r1-thread"); + t1.setDaemon(true); + t1.start(); + + // Ensure r1 submission path is underway before attempting r2. + r1Started.await(); + + // Submit r2 and validate fast rejection via callback. + final ClassicToAsyncRequestProducer p2 = new ClassicToAsyncRequestProducer(r2, Timeout.ofMinutes(2)); + final ClassicToAsyncResponseConsumer c2 = new ClassicToAsyncResponseConsumer(Timeout.ofMinutes(2)); + final AtomicReference r2Failure = new AtomicReference<>(); + final CountDownLatch r2Callback = new CountDownLatch(1); + + final long t0 = System.currentTimeMillis(); + System.out.println("Submitting r2 (should reject fast): " + r2); + + requester.execute( + target, + p2, + c2, + Timeout.ofMinutes(2), + HttpCoreContext.create(), + new FutureCallback() { + + @Override + public void completed(final Void result) { + r2Callback.countDown(); + } + + @Override + public void failed(final Exception ex) { + r2Failure.set(ex); + r2Callback.countDown(); + } + + @Override + public void cancelled() { + r2Failure.set(new RuntimeException("cancelled")); + r2Callback.countDown(); + } + + }); + + // Drive the classic bridge (this is required for these adapters) + try { + p2.blockWaiting().execute(); + try (ClassicHttpResponse response = c2.blockWaiting()) { + drainEntity(response); + System.out.println("r2 unexpectedly succeeded: " + response.getCode()); + } + } catch (final Exception ex) { + // Depending on where you reject, you may see it here as well. + if (r2Failure.get() == null) { + r2Failure.set(ex); + } + } + + // The core validation: rejection observed quickly (well under 5 seconds). + r2Callback.await(2, TimeUnit.SECONDS); + final long dt = System.currentTimeMillis() - t0; + + final Exception failure = r2Failure.get(); + if (failure instanceof RejectedExecutionException) { + System.out.println("r2 rejected in " + dt + " ms: " + failure.getMessage()); + } else { + System.out.println("r2 result in " + dt + " ms: " + failure); + } + + // Wait for r1 to finish (slot release) + r1Done.await(); + + // Re-submit r2 (should execute now) + System.out.println("Re-submitting r2 after r1 completion (should execute): " + r2); + executeBlocking(requester, target, r2, Timeout.ofMinutes(2)); + + System.out.println("Shutting down I/O reactor"); + requester.initiateShutdown(); + } + + private static void executeBlocking( + final H2MultiplexingRequester requester, + final HttpHost target, + final ClassicHttpRequest request, + final Timeout timeout) throws Exception { + + final ClassicToAsyncRequestProducer requestProducer = new ClassicToAsyncRequestProducer(request, timeout); + final ClassicToAsyncResponseConsumer responseConsumer = new ClassicToAsyncResponseConsumer(timeout); + + requester.execute( + target, + requestProducer, + responseConsumer, + timeout, + HttpCoreContext.create(), + null); + + requestProducer.blockWaiting().execute(); + try (ClassicHttpResponse response = responseConsumer.blockWaiting()) { + System.out.println(request + " -> " + response.getCode()); + drainEntity(response); + } + } + + private static void drainEntity(final ClassicHttpResponse response) throws Exception { + final HttpEntity entity = response.getEntity(); + if (entity == null) { + return; + } + final String contentTypeValue = entity.getContentType(); + final ContentType contentType = contentTypeValue != null ? ContentType.parse(contentTypeValue) : null; + final Charset charset = ContentType.getCharset(contentType, StandardCharsets.UTF_8); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(entity.getContent(), charset))) { + while (reader.readLine() != null) { + // drain + } + } + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequestsPerConnectionCapTest.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequestsPerConnectionCapTest.java new file mode 100644 index 0000000000..b30d5a4b0f --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequestsPerConnectionCapTest.java @@ -0,0 +1,145 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.impl.nio.bootstrap; + +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.http.impl.DefaultAddressResolver; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy; +import org.apache.hc.core5.reactor.IOEventHandlerFactory; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.IOSession; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +class H2RequestsPerConnectionCapTest { + + @Mock + private IOSession ioSession; + + @Mock + private AsyncClientExchangeHandler delegate; + + private H2MultiplexingRequester requester; + + private int max = 0; + + @BeforeEach + void setUp() { + + + + MockitoAnnotations.openMocks(this); + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom().build(); + final IOEventHandlerFactory eventHandlerFactory = (session, attachment) -> null; + + requester = new H2MultiplexingRequester( + ioReactorConfig, + eventHandlerFactory, + null, + null, + null, + DefaultAddressResolver.INSTANCE, + new H2ClientTlsStrategy(), + null, + null, + max); + } + + private boolean tryAcquireSlot(final IOSession session, final int max) throws Exception { + final Method method = H2MultiplexingRequester.class + .getDeclaredMethod("tryAcquireSlot", IOSession.class, int.class); + method.setAccessible(true); + return (Boolean) method.invoke(requester, session, max); + } + + private void releaseSlot(final IOSession session, final int max) throws Exception { + final Method method = H2MultiplexingRequester.class + .getDeclaredMethod("releaseSlot", IOSession.class, int.class); + method.setAccessible(true); + method.invoke(requester, session, max); + } + + @Test + void testAcquireUpToMaxThenReject() throws Exception { + this.max = 2; + + Assertions.assertTrue(tryAcquireSlot(ioSession, max)); + Assertions.assertTrue(tryAcquireSlot(ioSession, max)); + Assertions.assertFalse(tryAcquireSlot(ioSession, max)); + } + + @Test + void testReleaseFreesSlot() throws Exception { + this.max = 2; + + Assertions.assertTrue(tryAcquireSlot(ioSession, max)); + Assertions.assertTrue(tryAcquireSlot(ioSession, max)); + + releaseSlot(ioSession, max); + + Assertions.assertTrue(tryAcquireSlot(ioSession, max)); + } + + @Test + void testUnlimitedWhenMaxNonPositive() throws Exception { + this.max = 0; + + for (int i = 0; i < 10_000; i++) { + Assertions.assertTrue(tryAcquireSlot(ioSession, max)); + } + } + + @Test + void testReleasingHandlerInvokesCallbackExactlyOnce() { + final AtomicInteger callbackCount = new AtomicInteger(0); + final AtomicBoolean callbackCalled = new AtomicBoolean(false); + + final ReleasingAsyncClientExchangeHandler handler = new ReleasingAsyncClientExchangeHandler( + delegate, + () -> { + callbackCount.incrementAndGet(); + callbackCalled.set(true); + }); + + handler.releaseResources(); + handler.releaseResources(); + + Assertions.assertTrue(callbackCalled.get()); + Assertions.assertEquals(1, callbackCount.get()); + Mockito.verify(delegate, Mockito.times(2)).releaseResources(); + } + +}