Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the Content.Chunk.isTerminal() contract stricter #8994

Merged
merged 1 commit into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,38 @@ private void registerDemand()

private class ContentSource implements Content.Source
{
private static final Content.Chunk ALREADY_READ_CHUNK = new Content.Chunk()
{
@Override
public ByteBuffer getByteBuffer()
{
throw new UnsupportedOperationException();
}

@Override
public boolean isLast()
{
throw new UnsupportedOperationException();
}

@Override
public void retain()
{
throw new UnsupportedOperationException();
}

@Override
public boolean release()
{
throw new UnsupportedOperationException();
}

@Override
public String toString()
{
return "ALREADY_READ_CHUNK";
}
};
private final int index;
private final AtomicReference<Runnable> demandCallbackRef = new AtomicReference<>();
private volatile Content.Chunk chunk;
Expand All @@ -368,8 +400,8 @@ private void onChunk(Content.Chunk chunk)
Content.Chunk currentChunk = this.chunk;
if (LOG.isDebugEnabled())
LOG.debug("Registering content in multiplexed content source #{} that contains {}", index, currentChunk);
if (currentChunk == null || currentChunk == AlreadyReadChunk.INSTANCE)
this.chunk = Content.Chunk.slice(chunk);
if (currentChunk == null || currentChunk == ALREADY_READ_CHUNK)
this.chunk = chunk.slice();
else if (!currentChunk.isLast())
throw new IllegalStateException("Cannot overwrite chunk");
onDemandCallback();
Expand All @@ -396,7 +428,7 @@ private void onDemandCallback()
@Override
public Content.Chunk read()
{
if (chunk == AlreadyReadChunk.INSTANCE)
if (chunk == ALREADY_READ_CHUNK)
{
if (LOG.isDebugEnabled())
LOG.debug("Content source #{} already read current chunk", index);
Expand All @@ -405,7 +437,7 @@ public Content.Chunk read()

Content.Chunk result = chunk;
if (result != null && !result.isTerminal())
chunk = AlreadyReadChunk.INSTANCE;
chunk = ALREADY_READ_CHUNK;
if (LOG.isDebugEnabled())
LOG.debug("Content source #{} reading current chunk {}", index, result);
return result;
Expand All @@ -419,7 +451,7 @@ public void demand(Runnable demandCallback)
Content.Chunk currentChunk = this.chunk;
if (LOG.isDebugEnabled())
LOG.debug("Content source #{} demand while current chunk is {}", index, currentChunk);
if (currentChunk == null || currentChunk == AlreadyReadChunk.INSTANCE)
if (currentChunk == null || currentChunk == ALREADY_READ_CHUNK)
registerDemand();
else
onDemandCallback();
Expand All @@ -440,44 +472,5 @@ public void fail(Throwable failure)
registerFailure(failure);
}
}

private static final class AlreadyReadChunk implements Content.Chunk
{
static final AlreadyReadChunk INSTANCE = new AlreadyReadChunk();

private AlreadyReadChunk()
{
}

@Override
public ByteBuffer getByteBuffer()
{
throw new UnsupportedOperationException();
}

@Override
public boolean isLast()
{
throw new UnsupportedOperationException();
}

@Override
public void retain()
{
throw new UnsupportedOperationException();
}

@Override
public boolean release()
{
throw new UnsupportedOperationException();
}

@Override
public String toString()
{
return getClass().getSimpleName();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void testEmptyStringBoundary() throws Exception
public void testNoBody() throws Exception
{
MultiPartFormData formData = new MultiPartFormData("boundary");
formData.parse(Content.Chunk.from(ByteBuffer.allocate(0), true));
formData.parse(Content.Chunk.EOF);

formData.handle((parts, failure) ->
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testParseNoPartsInTwoChunks()

assertEquals(0, listener.events.size());

parser.parse(Content.Chunk.from(BufferUtil.EMPTY_BUFFER, true));
parser.parse(Content.Chunk.EOF);
lorban marked this conversation as resolved.
Show resolved Hide resolved

assertEquals(1, listener.events.size());
assertEquals("complete", listener.events.poll());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,16 @@ public void onData(long streamId, DataFrame frame)
if (frame.isLast())
shutdownInput();

networkBuffer.retain();
StreamData data = new StreamData(frame, networkBuffer);
Stream.Data data;
if (!frame.getByteBuffer().hasRemaining() && frame.isLast())
{
data = Stream.Data.EOF;
}
else
{
networkBuffer.retain();
data = new StreamData(frame, networkBuffer);
}

Runnable existing = action.getAndSet(() ->
{
Expand Down
92 changes: 60 additions & 32 deletions jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,9 @@ public interface Chunk extends Retainable
*/
static Chunk from(ByteBuffer byteBuffer, boolean last)
{
return new ByteBufferChunk.WithReferenceCount(byteBuffer, last);
if (byteBuffer.hasRemaining())
return new ByteBufferChunk.WithReferenceCount(byteBuffer, last);
return last ? EOF : EMPTY;
}

/**
Expand All @@ -453,11 +455,15 @@ static Chunk from(ByteBuffer byteBuffer, boolean last)
*
* @param byteBuffer the ByteBuffer with the bytes of this Chunk
* @param last whether the Chunk is the last one
* @param releaser the code to run when this Chunk is released
* @return a new Chunk
*/
static Chunk from(ByteBuffer byteBuffer, boolean last, Runnable releaser)
{
return new ByteBufferChunk.ReleasedByRunnable(byteBuffer, last, Objects.requireNonNull(releaser));
if (byteBuffer.hasRemaining())
return new ByteBufferChunk.ReleasedByRunnable(byteBuffer, last, Objects.requireNonNull(releaser));
releaser.run();
return last ? EOF : EMPTY;
}

/**
Expand All @@ -471,7 +477,10 @@ static Chunk from(ByteBuffer byteBuffer, boolean last, Runnable releaser)
*/
static Chunk from(ByteBuffer byteBuffer, boolean last, Consumer<ByteBuffer> releaser)
{
return new ByteBufferChunk.ReleasedByConsumer(byteBuffer, last, Objects.requireNonNull(releaser));
if (byteBuffer.hasRemaining())
return new ByteBufferChunk.ReleasedByConsumer(byteBuffer, last, Objects.requireNonNull(releaser));
releaser.accept(byteBuffer);
return last ? EOF : EMPTY;
}

/**
Expand All @@ -486,7 +495,10 @@ static Chunk from(ByteBuffer byteBuffer, boolean last, Consumer<ByteBuffer> rele
*/
static Chunk from(ByteBuffer byteBuffer, boolean last, Retainable retainable)
{
return new ByteBufferChunk.WithRetainable(byteBuffer, last, Objects.requireNonNull(retainable));
if (byteBuffer.hasRemaining())
return new ByteBufferChunk.WithRetainable(byteBuffer, last, Objects.requireNonNull(retainable));
retainable.release();
return last ? EOF : EMPTY;
}

/**
Expand Down Expand Up @@ -539,28 +551,6 @@ static Chunk next(Chunk chunk)
return null;
}

/**
* <p>Creates a chunk that is a slice of the given chunk.</p>
* <p>
* The chunk slice has a byte buffer that is a slice of the original chunk's byte buffer, the last flag
* copied over and the original chunk used as the retainable of the chunk slice.
* Note that after this method returns, an extra Chunk instance refers to the same byte buffer,
* so the original chunk is retained and is used as the {@link Retainable} for the returned chunk.
* </p>
* <p>Passing a null chunk returns null.</p>
* <p>Passing a terminal chunk returns it.</p>
*
* @param chunk the chunk to slice.
* @return the chunk slice.
*/
static Chunk slice(Chunk chunk)
{
if (chunk == null || chunk.isTerminal())
return chunk;
chunk.retain();
return Chunk.from(chunk.getByteBuffer().slice(), chunk.isLast(), chunk);
}

/**
* @return the ByteBuffer of this Chunk
*/
Expand All @@ -571,9 +561,38 @@ static Chunk slice(Chunk chunk)
*/
boolean isLast();

/**
* <p>Returns a new {@code Chunk} whose {@code ByteBuffer} is a slice of the
* {@code ByteBuffer} of the source {@code Chunk} unless the source
* {@link #hasRemaining() has no remaining byte} in which case:</p>
* <ul>
* <li>{@code this} is returned if it is an instance of {@link Error}</li>
* <li>{@link #EOF} is returned if {@link #isLast()} is {@code true}</li>
* <li>{@link #EMPTY} is returned if {@link #isLast()} is {@code false}</li>
* </ul>
* <p>If the source has remaining bytes, the returned {@code Chunk} retains
* the source {@code Chunk} and it is linked to it via
* {@link #from(ByteBuffer, boolean, Retainable)}.</p>
*
* @return a new {@code Chunk} retained from the source {@code Chunk} with a slice
* of the source {@code Chunk}'s {@code ByteBuffer}
*/
default Chunk slice()
{
if (isTerminal())
return this;
if (!hasRemaining())
return EMPTY;
retain();
return from(getByteBuffer().slice(), isLast(), this);
}

/**
* <p>Returns a new {@code Chunk} whose {@code ByteBuffer} is a slice, with the given
* position and limit, of the {@code ByteBuffer} of the source {@code Chunk}.</p>
* position and limit, of the {@code ByteBuffer} of the source {@code Chunk} unless the
* source is {@link #isTerminal() terminal} in which case {@code this} is returned, or
* if {@code position == limit} in which case {@link #EOF} or {@link #EMPTY} is
* returned depending on the value of {@code last}.</p>
* <p>The returned {@code Chunk} retains the source {@code Chunk} and it is linked
* to it via {@link #from(ByteBuffer, boolean, Retainable)}.</p>
*
Expand All @@ -587,6 +606,8 @@ default Chunk slice(int position, int limit, boolean last)
{
if (isTerminal())
return this;
if (position == limit)
return last ? EOF : EMPTY;
ByteBuffer sourceBuffer = getByteBuffer();
int sourceLimit = sourceBuffer.limit();
sourceBuffer.limit(limit);
Expand Down Expand Up @@ -649,20 +670,27 @@ default int skip(int length)

/**
* <p>Returns whether this Chunk is a <em>terminal</em> chunk.</p>
* <p>A terminal chunk is either an {@link Error error chunk},
* or a Chunk that {@link #isLast()} is true and has no remaining
* bytes.</p>
* <p>A terminal chunk is a Chunk that {@link #isLast()} is true
* and has no remaining bytes.</p>
* <p><em>Terminal</em> chunks cannot be lifecycled using the
* {@link Retainable} contract like other chunks: they always throw
* {@link UnsupportedOperationException} on {@link #retain()} and
* always return {@code true} on {@link #release()}. As such, they
* cannot contain a recyclable buffer and calling their
* {@link #release()} method isn't necessary, although harmless.
* </p>
*
* @return whether this Chunk is a terminal chunk
*/
default boolean isTerminal()
{
return this instanceof Error || isLast() && !hasRemaining();
return isLast() && !hasRemaining();
}

/**
* <p>A chunk that wraps a failure.</p>
* <p>Error Chunks are always last and have no bytes to read.</p>
* <p>Error Chunks are always last and have no bytes to read,
* as such they are <em>terminal</em> Chunks.</p>
*
* @see #from(Throwable)
*/
Expand Down
Loading