Skip to content

Commit

Permalink
review #1
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Nov 8, 2022
1 parent 3dfd3db commit 51ba021
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,19 @@ public abstract class HttpReceiver
private final SerializedInvoker invoker = new SerializedInvoker();
private final HttpChannel channel;
private ResponseState responseState = ResponseState.IDLE;
private Content.Source contentSource;
private ContentSource original;
private NotifiableContentSource contentSource;
private Throwable failure;

protected HttpReceiver(HttpChannel channel)
{
this.channel = channel;
}

protected ContentSource getContentSource()
public void receive()
{
return original;
contentSource.onDataAvailable();
}

public abstract void receive();

protected abstract Content.Chunk read(boolean fillInterestIfNeeded);

protected abstract void onEofConsumed();
Expand All @@ -107,6 +104,11 @@ public boolean isFailed()
return responseState == ResponseState.FAILURE;
}

public boolean isContent()
{
return responseState == ResponseState.CONTENT;
}

/**
* Method to be invoked when the response status code is available.
* <p>
Expand Down Expand Up @@ -246,9 +248,9 @@ protected void responseHeaders(HttpExchange exchange)
if (LOG.isDebugEnabled())
LOG.debug("Switching to CONTENT state");
responseState = ResponseState.CONTENT;
if (original != null)
if (contentSource != null)
throw new IllegalStateException();
contentSource = original = new ContentSource(invoker, this);
contentSource = new ContentSource();

List<Response.ContentSourceListener> contentListeners = responseListeners.stream()
.filter(l -> l instanceof Response.ContentSourceListener)
Expand Down Expand Up @@ -279,6 +281,11 @@ protected void responseHeaders(HttpExchange exchange)
});
}

protected void responseContentAvailable()
{

}

/**
* Method to be invoked when the response is successful.
* <p>
Expand Down Expand Up @@ -421,7 +428,6 @@ protected void dispose()
private void cleanup()
{
contentSource = null;
original = null;
}

public void abort(HttpExchange exchange, Throwable failure, Promise<Boolean> promise)
Expand Down Expand Up @@ -495,19 +501,32 @@ private enum ResponseState
FAILURE
}

private static class DecodingContentSource extends ContentSourceTransformer
private interface NotifiableContentSource extends Content.Source
{
void onDataAvailable();
}

private static class DecodingContentSource extends ContentSourceTransformer implements NotifiableContentSource
{
private static final Logger LOG = LoggerFactory.getLogger(DecodingContentSource.class);

private final NotifiableContentSource _rawSource;
private final ContentDecoder _decoder;
private volatile Content.Chunk _chunk;
private Content.Chunk _chunk;

public DecodingContentSource(Content.Source rawSource, ContentDecoder decoder)
public DecodingContentSource(NotifiableContentSource rawSource, ContentDecoder decoder)
{
super(rawSource);
_rawSource = rawSource;
_decoder = decoder;
}

@Override
public void onDataAvailable()
{
_rawSource.onDataAvailable();
}

@Override
protected Content.Chunk transform(Content.Chunk inputChunk)
{
Expand Down Expand Up @@ -564,22 +583,14 @@ protected Content.Chunk transform(Content.Chunk inputChunk)
}
}

protected static class ContentSource implements Content.Source
private class ContentSource implements NotifiableContentSource
{
private static final Logger LOG = LoggerFactory.getLogger(ContentSource.class);

private final SerializedInvoker invoker;
private final HttpReceiver receiver;
private Content.Chunk currentChunk;
private Runnable demandCallback;
private boolean responseSucceeded;

public ContentSource(SerializedInvoker invoker, HttpReceiver receiver)
{
this.invoker = invoker;
this.receiver = receiver;
}

@Override
public Content.Chunk read()
{
Expand All @@ -588,10 +599,11 @@ public Content.Chunk read()
Content.Chunk chunk = consumeCurrentChunk();
if (chunk != null)
return chunk;
currentChunk = receiver.read(false);
currentChunk = HttpReceiver.this.read(false);
return consumeCurrentChunk();
}

@Override
public void onDataAvailable()
{
if (LOG.isDebugEnabled())
Expand All @@ -607,7 +619,7 @@ private Content.Chunk consumeCurrentChunk()
if (currentChunk == Content.Chunk.EOF && !responseSucceeded)
{
responseSucceeded = true;
receiver.onEofConsumed();
HttpReceiver.this.onEofConsumed();
}
Content.Chunk chunk = currentChunk;
currentChunk = Content.Chunk.next(chunk);
Expand Down Expand Up @@ -640,7 +652,7 @@ private void meetDemand()
}
else
{
currentChunk = receiver.read(true);
currentChunk = HttpReceiver.this.read(true);
if (currentChunk == null)
return;
}
Expand Down Expand Up @@ -672,7 +684,7 @@ public void fail(Throwable failure)
if (currentChunk != null)
{
currentChunk.release();
receiver.failAndClose(failure);
HttpReceiver.this.failAndClose(failure);
}
currentChunk = Content.Chunk.from(failure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,15 @@ public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
@Override
public void receive()
{
// This method is the callback of fill interest.
// As such, it is called repeatedly until the ContentSourceListener.onContentSource() loop gets started;
// meaning firstContent is false and it must register for fill interest if no filling was done
// until onContentSource() gets called.
// Once onContentSource() gets called, firstContent is true and it must just notify that content may be generated.

ContentSource contentSource = getContentSource();
if (contentSource == null)
if (!isContent())
{
boolean setFillInterest = parseAndFill();
if (getContentSource() == null && setFillInterest)
if (!isContent() && setFillInterest)
fillInterested();
}
else
{
// This calls the demand callback of the onContentSource loop.
contentSource.onDataAvailable();
super.receive();
}
}

Expand Down Expand Up @@ -445,7 +437,7 @@ public boolean content(ByteBuffer buffer)
chunk = Content.Chunk.from(buffer, false, networkBuffer);
if (LOG.isDebugEnabled())
LOG.debug("Setting action to contentSource.onDataAvailable()");
if (actionRef.getAndSet(getContentSource()::onDataAvailable) != null)
if (actionRef.getAndSet(super::receive) != null)
throw new IllegalStateException();
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,16 @@ public HttpReceiverOverFCGI(HttpChannel channel)
@Override
public void receive()
{
ContentSource contentSource = getContentSource();
if (getContentSource() == null)
if (!isContent())
{
HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection();
boolean setFillInterest = httpConnection.parseAndFill();
if (getContentSource() == null && setFillInterest)
if (!isContent() && setFillInterest)
httpConnection.fillInterested();
}
else
{
contentSource.onDataAvailable();
super.receive();
}
}

Expand Down Expand Up @@ -89,7 +88,8 @@ void content(Content.Chunk chunk)
if (this.chunk != null)
throw new IllegalStateException();
this.chunk = chunk;
getContentSource().onDataAvailable();

super.receive();
}

void end(HttpExchange exchange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ public HttpReceiverOverHTTP2(HttpChannel channel)
super(channel);
}

@Override
public void receive()
{
onDataAvailable();
}

@Override
public Content.Chunk read(boolean fillInterestIfNeeded)
{
Expand Down Expand Up @@ -218,7 +212,7 @@ public void onDataAvailable()
if (exchange == null)
return;

getContentSource().onDataAvailable();
receive();
}

void onReset(ResetFrame frame)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ protected HttpReceiverOverHTTP3(HttpChannelOverHTTP3 channel)
super(channel);
}

@Override
public void receive()
{
onDataAvailable(null);
}

@Override
public Content.Chunk read(boolean fillInterestIfNeeded)
{
Expand Down Expand Up @@ -116,9 +110,7 @@ public void onDataAvailable(Stream.Client stream)
if (exchange == null)
return;

ContentSource contentSource = getContentSource();
if (contentSource != null)
contentSource.onDataAvailable();
receive();
}

@Override
Expand Down

0 comments on commit 51ba021

Please sign in to comment.