Skip to content

Commit

Permalink
Improvements on suggested PR.
Browse files Browse the repository at this point in the history
* Made HttpChannel Dispatchable fields for REQUEST and ASYNC dispatches.
* Made AdaptiveExecutionStrategy implement Runnable to remove lambda/anonymous field.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Jul 17, 2023
1 parent 5db0c7b commit 9e16d81
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
private final Request _request;
private final Response _response;
private final HttpChannel.Listener _combinedListener;
private final Dispatchable _requestDispatcher;
private final Dispatchable _asyncDispatcher;
@Deprecated
private final List<Listener> _transientListeners = new ArrayList<>();
private MetaData.Response _committedMetaData;
Expand All @@ -110,6 +112,8 @@ public HttpChannel(Connector connector, HttpConfiguration configuration, EndPoin
_combinedListener = (connector instanceof AbstractConnector)
? ((AbstractConnector)connector).getHttpChannelListeners()
: NOOP_LISTENER;
_requestDispatcher = new RequestDispatchable();
_asyncDispatcher = new AsyncDispatchable();

if (LOG.isDebugEnabled())
LOG.debug("new {} -> {},{},{}",
Expand Down Expand Up @@ -494,27 +498,14 @@ public boolean handle()
if (!_request.hasMetaData())
throw new IllegalStateException("state=" + _state);

dispatch(DispatcherType.REQUEST, new Dispatchable()
{
@Override
public void dispatch() throws IOException, ServletException
{
for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers())
{
customizer.customize(getConnector(), _configuration, _request);
if (_request.isHandled())
return;
}
getServer().handle(HttpChannel.this);
}
});
dispatch(DispatcherType.REQUEST, _requestDispatcher);

break;
}

case ASYNC_DISPATCH:
{
dispatch(DispatcherType.ASYNC, () -> getServer().handleAsync(this));
dispatch(DispatcherType.ASYNC, _asyncDispatcher);
break;
}

Expand Down Expand Up @@ -553,11 +544,7 @@ public void dispatch() throws IOException, ServletException
break;
}

dispatch(DispatcherType.ERROR, () ->
{
errorHandler.handle(null, _request, _request, _response);
_request.setHandled(true);
});
dispatch(DispatcherType.ERROR, new ErrorDispatchable(errorHandler));
}
catch (Throwable x)
{
Expand Down Expand Up @@ -1596,4 +1583,45 @@ public void onComplete(Request request)
request.getHttpChannel().notifyEvent1(listener -> listener::onComplete, request);
}
}

private class RequestDispatchable implements Dispatchable
{
@Override
public void dispatch() throws IOException, ServletException
{
for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers())
{
customizer.customize(getConnector(), _configuration, _request);
if (_request.isHandled())
return;
}
getServer().handle(HttpChannel.this);
}
}

private class AsyncDispatchable implements Dispatchable
{
@Override
public void dispatch() throws IOException, ServletException
{
getServer().handleAsync(HttpChannel.this);
}
}

private class ErrorDispatchable implements Dispatchable
{
private final ErrorHandler _errorHandler;

public ErrorDispatchable(ErrorHandler errorHandler)
{
_errorHandler = errorHandler;
}

@Override
public void dispatch() throws IOException, ServletException
{
_errorHandler.handle(null, _request, _request, _response);
_request.setHandled(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
* that says that a hunter should eat (i.e. consume) what they kill (i.e. produced).</p>
*/
@ManagedObject("Adaptive execution strategy")
public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements ExecutionStrategy
public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements ExecutionStrategy, Runnable
{
private static final Logger LOG = LoggerFactory.getLogger(AdaptiveExecutionStrategy.class);

Expand Down Expand Up @@ -137,14 +137,6 @@ private enum SubStrategy
private final Executor _executor;
private final TryExecutor _tryExecutor;
private final Executor _virtualExecutor;
private final Runnable _runPendingProducer = new Runnable()
{
@Override
public void run()
{
tryProduce(true);
}
};
private State _state = State.IDLE;
private boolean _pending;

Expand Down Expand Up @@ -192,7 +184,7 @@ public void dispatch()
if (LOG.isDebugEnabled())
LOG.debug("{} dispatch {}", this, execute);
if (execute)
_executor.execute(_runPendingProducer);
_executor.execute(this);
}

@Override
Expand All @@ -201,6 +193,12 @@ public void produce()
tryProduce(false);
}

@Override
public void run()
{
tryProduce(true);
}

/**
* Tries to become the producing thread and then produces and consumes tasks.
*
Expand Down Expand Up @@ -315,7 +313,7 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking)
try (AutoLock l = _lock.lock())
{
// If a pending producer is available or one can be started
if (_pending || _tryExecutor.tryExecute(_runPendingProducer))
if (_pending || _tryExecutor.tryExecute(this))
{
// Use EPC: the producer directly consumes the task, which may block
// and then races with the pending producer to resume production.
Expand All @@ -339,7 +337,7 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking)
try (AutoLock l = _lock.lock())
{
// If a pending producer is available or one can be started
if (_pending || _tryExecutor.tryExecute(_runPendingProducer))
if (_pending || _tryExecutor.tryExecute(this))
{
// use EPC: The producer directly consumes the task, which may block
// and then races with the pending producer to resume production.
Expand Down

0 comments on commit 9e16d81

Please sign in to comment.