diff --git a/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java b/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
index 14062c98b8f7..afdb2617a872 100644
--- a/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
@@ -157,7 +157,7 @@ void sendStreamReset(StreamStateMachine state, StreamException se) throws IOExce
boolean active = state.isActive();
state.sendReset();
if (active) {
- decrementActiveRemoteStreamCount();
+ decrementActiveRemoteStreamCount(getStream(se.getStreamId()));
}
}
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index d656a564ae1a..db73830d4511 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -290,8 +290,8 @@ protected void processStreamOnContainerThread(Stream stream) {
}
- protected void decrementActiveRemoteStreamCount() {
- setConnectionTimeoutForStreamCount(activeRemoteStreamCount.decrementAndGet());
+ protected void decrementActiveRemoteStreamCount(Stream stream) {
+ setConnectionTimeoutForStreamCount(stream.decrementAndGetActiveRemoteStreamCount());
}
@@ -598,7 +598,7 @@ void sendStreamReset(StreamStateMachine state, StreamException se) throws IOExce
boolean active = state.isActive();
state.sendReset();
if (active) {
- decrementActiveRemoteStreamCount();
+ decrementActiveRemoteStreamCount(getStream(se.getStreamId()));
}
}
socketWrapper.write(true, rstFrame, 0, rstFrame.length);
@@ -825,7 +825,7 @@ void writeBody(Stream stream, ByteBuffer data, int len, boolean finished) throws
protected void sentEndOfStream(Stream stream) {
stream.sentEndOfStream();
if (!stream.isActive()) {
- decrementActiveRemoteStreamCount();
+ decrementActiveRemoteStreamCount(stream);
}
}
@@ -1208,7 +1208,7 @@ private int allocate(AbstractStream stream, int allocation) {
}
- private Stream getStream(int streamId) {
+ Stream getStream(int streamId) {
Integer key = Integer.valueOf(streamId);
AbstractStream result = streams.get(key);
if (result instanceof Stream) {
@@ -1536,6 +1536,7 @@ public HeaderEmitter headersStart(int streamId, boolean headersEndStream) throws
Stream stream = getStream(streamId, false);
if (stream == null) {
stream = createRemoteStream(streamId);
+ activeRemoteStreamCount.incrementAndGet();
}
if (streamId < maxActiveRemoteStreamId) {
throw new ConnectionException(sm.getString("upgradeHandler.stream.old", Integer.valueOf(streamId),
@@ -1597,9 +1598,8 @@ public void headersEnd(int streamId, boolean endOfStream) throws Http2Exception
Stream stream = (Stream) abstractNonZeroStream;
if (stream.isActive()) {
if (stream.receivedEndOfHeaders()) {
-
- if (localSettings.getMaxConcurrentStreams() < activeRemoteStreamCount.incrementAndGet()) {
- decrementActiveRemoteStreamCount();
+ if (localSettings.getMaxConcurrentStreams() < activeRemoteStreamCount.get()) {
+ decrementActiveRemoteStreamCount(stream);
// Ignoring maxConcurrentStreams increases the overhead count
increaseOverheadCount(FrameType.HEADERS);
throw new StreamException(
@@ -1643,7 +1643,7 @@ public void receivedEndOfStream(int streamId) throws ConnectionException {
private void receivedEndOfStream(Stream stream) throws ConnectionException {
stream.receivedEndOfStream();
if (!stream.isActive()) {
- decrementActiveRemoteStreamCount();
+ decrementActiveRemoteStreamCount(stream);
}
}
@@ -1669,7 +1669,7 @@ public void reset(int streamId, long errorCode) throws Http2Exception {
boolean active = stream.isActive();
stream.receiveReset(errorCode);
if (active) {
- decrementActiveRemoteStreamCount();
+ decrementActiveRemoteStreamCount(stream);
}
}
}
diff --git a/java/org/apache/coyote/http2/Stream.java b/java/org/apache/coyote/http2/Stream.java
index 5bb4dfba0f91..f8e80be5b89d 100644
--- a/java/org/apache/coyote/http2/Stream.java
+++ b/java/org/apache/coyote/http2/Stream.java
@@ -25,6 +25,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
@@ -88,6 +89,7 @@ class Stream extends AbstractNonZeroStream implements HeaderEmitter {
private final StreamInputBuffer inputBuffer;
private final StreamOutputBuffer streamOutputBuffer = new StreamOutputBuffer();
private final Http2OutputBuffer http2OutputBuffer = new Http2OutputBuffer(coyoteResponse, streamOutputBuffer);
+ private final AtomicBoolean removedFromActiveCount = new AtomicBoolean(false);
// State machine would be too much overhead
private int headerState = HEADER_STATE_START;
@@ -838,6 +840,20 @@ public void setIncremental(boolean incremental) {
}
+ int decrementAndGetActiveRemoteStreamCount() {
+ /*
+ * Protect against mis-counting of active streams. This method should only be called once per stream but since
+ * the count of active streams is used to enforce the maximum concurrent streams limit, make sure each stream is
+ * only removed from the active count exactly once.
+ */
+ if (removedFromActiveCount.compareAndSet(false, true)) {
+ return handler.activeRemoteStreamCount.decrementAndGet();
+ } else {
+ return handler.activeRemoteStreamCount.get();
+ }
+ }
+
+
class StreamOutputBuffer implements HttpOutputBuffer, WriteBuffer.Sink {
private final Lock writeLock = new ReentrantLock();
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index c956b103354d..de09f3386617 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -161,6 +161,10 @@
Connector
element, similar to the Executor
element, for consistency. (remm)
+
+ Make counting of active HTTP/2 streams per connection more robust.
+ (markt)
+