diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourcePublisher.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourcePublisher.java index 41ececb2234..187c581590d 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourcePublisher.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourcePublisher.java @@ -15,7 +15,6 @@ import java.util.Objects; import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletionException; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -92,11 +91,9 @@ private void onMultiSubscribe(Flow.Subscriber subscriber) // As per rule 1.9, we need to throw a `java.lang.NullPointerException` // if the `Subscriber` is `null` if (subscriber == null) - { throw new NullPointerException("Flow.Subscriber must not be null"); - } - LastWillSubscription subscription = new ExhaustedSubscription(); + LastWillSubscription subscription = new ExhaustedSubscription(); // As per 1.9, this method must return normally (i.e. not throw). try { @@ -196,7 +193,7 @@ public void cancel(LastWill lastWill) private static final class ActiveSubscription extends IteratingCallback implements LastWillSubscription { private static final long NO_MORE_DEMAND = -1; - private static final Throwable COMPLETED = new StaticException("Source.Content read fully"); + private static final LastWill COMPLETED = new LastWill(new StaticException("Source.Content read fully"), FinalSignal.COMPLETE); private final AtomicReference cancelled; private final AtomicLong demand; private Content.Source content; @@ -230,6 +227,7 @@ protected Action process() // drop any references to the corresponding subscriber. this.demand.set(NO_MORE_DEMAND); // TODO: HttpChannelState does not satisfy the contract of Content.Source "If read() has returned a last chunk, this is a no operation." + // https://github.com/jetty/jetty.project/issues/11879 if (finalSignal != FinalSignal.COMPLETE) this.content.fail(reason); this.content = null; @@ -270,14 +268,16 @@ protected Action process() } catch (Throwable err) { + chunk.release(); cancel(err, FinalSignal.SUPPRESS); LOG.error("Flow.Subscriber " + subscriber + " violated rule 2.13", err); + return Action.IDLE; } chunk.release(); if (chunk.isLast()) { - cancel(COMPLETED, FinalSignal.COMPLETE); + cancel(COMPLETED); return Action.IDLE; }