Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handle request with same group id #498

Merged
merged 20 commits into from
Oct 11, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ private int decodePlainFrame(
server.decodeChannel = frameHeader.channel();
server.decoder = decodePerformative;
server.readIdleTimeout = defaultIdleTimeout;
server.doSignalReadIdleTimeoutIfNecessary();
server.doSignalReadIdleTimeoutIfNecessary(traceId);
progress = frameHeader.limit();
}

Expand Down Expand Up @@ -2059,7 +2059,7 @@ private void doEncodeClose(

replyBudgetReserved += size + replyPad;
doNetworkData(traceId, authorization, 0L, payload);
doSignalCloseTimeout();
doSignalCloseTimeout(traceId);
}

private void encodeNetwork(
Expand Down Expand Up @@ -2098,7 +2098,7 @@ private void encodeNetworkData(
assert replySeq <= replyAck + replyMax :
String.format("%d <= %d + %d", replySeq, replyAck, replyMax);

doSignalWriteIdleTimeoutIfNecessary();
doSignalWriteIdleTimeoutIfNecessary(traceId);
}

final int maxLength = maxLimit - offset;
Expand Down Expand Up @@ -2432,7 +2432,8 @@ private void onReadIdleTimeoutSignal(
}
else
{
readIdleTimeoutId = signaler.signalAt(readIdleTimeoutAt, originId, routedId, replyId, READ_IDLE_SIGNAL_ID, 0);
readIdleTimeoutId = signaler.signalAt(readIdleTimeoutAt, originId, routedId, replyId, traceId,
READ_IDLE_SIGNAL_ID, 0);
}
}

Expand All @@ -2450,7 +2451,8 @@ private void onWriteIdleTimeoutSignal(
}
else
{
writeIdleTimeoutId = signaler.signalAt(writeIdleTimeoutAt, originId, routedId, replyId, WRITE_IDLE_SIGNAL_ID, 0);
writeIdleTimeoutId = signaler.signalAt(writeIdleTimeoutAt, originId, routedId, replyId,
traceId, WRITE_IDLE_SIGNAL_ID, 0);
}
}

Expand Down Expand Up @@ -2709,7 +2711,7 @@ private void onDecodeOpen(
{
onDecodeError(traceId, authorization, NOT_ALLOWED, timeoutTooSmallDescription);
}
doSignalWriteIdleTimeoutIfNecessary();
doSignalWriteIdleTimeoutIfNecessary(traceId);
}
}

Expand Down Expand Up @@ -2985,20 +2987,23 @@ private void cleanupEncodeSlotIfNecessary()
}
}

private void doSignalReadIdleTimeoutIfNecessary()
private void doSignalReadIdleTimeoutIfNecessary(
long traceId)
{
if (readIdleTimeout > 0)
{
readIdleTimeoutAt = currentTimeMillis() + readIdleTimeout;

if (readIdleTimeoutId == NO_CANCEL_ID)
{
readIdleTimeoutId = signaler.signalAt(readIdleTimeoutAt, originId, routedId, replyId, READ_IDLE_SIGNAL_ID, 0);
readIdleTimeoutId = signaler.signalAt(readIdleTimeoutAt, originId, routedId, replyId,
traceId, READ_IDLE_SIGNAL_ID, 0);
}
}
}

private void doSignalWriteIdleTimeoutIfNecessary()
private void doSignalWriteIdleTimeoutIfNecessary(
long traceId)
{
if (writeIdleTimeout > 0)
{
Expand All @@ -3007,17 +3012,18 @@ private void doSignalWriteIdleTimeoutIfNecessary()
if (writeIdleTimeoutId == NO_CANCEL_ID)
{
writeIdleTimeoutId = signaler.signalAt(writeIdleTimeoutAt, originId, routedId, replyId,
WRITE_IDLE_SIGNAL_ID, 0);
traceId, WRITE_IDLE_SIGNAL_ID, 0);
}
}
}

private void doSignalCloseTimeout()
private void doSignalCloseTimeout(
long traceId)
{
final long closeTimeoutAt = currentTimeMillis() + closeTimeout;

assert closeTimeoutId == NO_CANCEL_ID;
closeTimeoutId = signaler.signalAt(closeTimeoutAt, originId, routedId, replyId, CLOSE_SIGNAL_ID, 0);
closeTimeoutId = signaler.signalAt(closeTimeoutAt, originId, routedId, replyId, traceId, CLOSE_SIGNAL_ID, 0);
}

private final class AmqpSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void cancelTimeoutSignal(
public void signalChange(
Signaler signaler)
{
signaler.signalNow(originId, routedId, replyId, FILE_CHANGED_SIGNAL_ID, 0);
signaler.signalNow(originId, routedId, replyId, 0, FILE_CHANGED_SIGNAL_ID, 0);
}

private void register(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,8 @@ private void onAppBegin(
else
{
long timeoutAt = now().toEpochMilli() + beginEx.timeout();
long timeoutId = signaler.signalAt(timeoutAt, originId, routedId, replyId, TIMEOUT_EXPIRED_SIGNAL_ID, 0);
long timeoutId = signaler.signalAt(timeoutAt, originId, routedId, replyId, traceId,
TIMEOUT_EXPIRED_SIGNAL_ID, 0);
watchedFile = new FileSystemWatcher.WatchedFile(
resolvedPath, symlinks, this::calculateTag, tag, timeoutId, originId, routedId, replyId);
fileSystemWatcher.watch(watchedFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ private void onNetBegin(
if (grpcTimeout > 0L)
{
expiringId = signaler.signalAt(now().toEpochMilli() + grpcTimeout,
originId, routedId, replyId, EXPIRING_SIGNAL, 0);
originId, routedId, replyId, traceId, EXPIRING_SIGNAL, 0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1386,7 +1386,7 @@ private void doKafkaBegin(
if (timeout > 0L)
{
cancelWait = signaler.signalAt(now().toEpochMilli() + timeout, originId, routedId, initialId,
SIGNAL_WAIT_EXPIRED, 0);
traceId, SIGNAL_WAIT_EXPIRED, 0);
}
}

Expand Down Expand Up @@ -3342,7 +3342,7 @@ private void doKafkaBegin(
if (timeout > 0L)
{
cancelWait = signaler.signalAt(now().toEpochMilli() + timeout, originId, routedId, initialId,
SIGNAL_WAIT_EXPIRED, 0);
traceId, SIGNAL_WAIT_EXPIRED, 0);
}
doKafkaWindow(traceId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ private int decodeHeaders(
CharSequence.compare("PRI * HTTP/2.0\r\n", new AsciiSequenceView(buffer, offset, 16)) == 0)
{
server.delegate = new Http2Server(server);
signaler.signalNow(server.originId, server.routedId, server.replyId, DELEGATE_SIGNAL, 0);
signaler.signalNow(server.originId, server.routedId, server.replyId, traceId, DELEGATE_SIGNAL, 0);
return offset;
}

Expand Down Expand Up @@ -2214,7 +2214,7 @@ private void onDecodeHeaders(
String origin,
HttpBeginExFW beginEx)
{
final HttpExchange exchange = new HttpExchange(originId, routedId, authorization, policy, origin);
final HttpExchange exchange = new HttpExchange(originId, routedId, authorization, traceId, policy, origin);
exchange.doRequestBegin(traceId, beginEx);
exchange.doResponseWindow(traceId);

Expand Down Expand Up @@ -2693,6 +2693,7 @@ private HttpExchange(
long originId,
long routedId,
long sessionId,
long traceId,
HttpPolicyConfig policy,
String origin)
{
Expand All @@ -2709,7 +2710,7 @@ private HttpExchange(
this.responsePad = PADDING_CHUNKED;
this.responseRemaining = Integer.MAX_VALUE - encodeMax;

this.expiringId = expireIfNecessary(guard, sessionId, originId, routedId, replyId, 0);
this.expiringId = expireIfNecessary(guard, sessionId, originId, routedId, replyId, traceId, 0);
}

private void doRequestBegin(
Expand Down Expand Up @@ -3015,12 +3016,12 @@ private void onResponseExpiring(
else if (canChallenge(requestCaps) && guard.challenge(sessionId, now))
{
doResponseChallenge(traceId);
expiringId = signaler.signalAt(expiresAt, originId, routedId, replyId, EXPIRING_SIGNAL, 0);
expiringId = signaler.signalAt(expiresAt, originId, routedId, replyId, traceId, EXPIRING_SIGNAL, 0);
}
else
{
final long expiringAt = guard.expiringAt(sessionId);
expiringId = signaler.signalAt(expiringAt, originId, routedId, replyId, EXPIRING_SIGNAL, 0);
expiringId = signaler.signalAt(expiringAt, originId, routedId, replyId, traceId, EXPIRING_SIGNAL, 0);
}
}

Expand Down Expand Up @@ -4833,8 +4834,8 @@ else if (!isCorsRequestAllowed(binding, headers))
final String origin = policy == CROSS_ORIGIN ? headers.get(HEADER_NAME_ORIGIN) : null;

final Http2Exchange exchange =
new Http2Exchange(originId, routedId, NO_REQUEST_ID, streamId, exchangeAuth, policy,
origin, contentLength);
new Http2Exchange(originId, routedId, NO_REQUEST_ID, streamId, exchangeAuth,
traceId, policy, origin, contentLength);

if (binding.options != null && binding.options.overrides != null)
{
Expand Down Expand Up @@ -5205,7 +5206,7 @@ private void doEncodePromise(
doEncodePushPromise(traceId, authorization, pushId, promiseId, promise);

final Http2Exchange exchange = new Http2Exchange(originId, routedId, requestId, promiseId,
exchangeAuth, policy, origin, contentLength);
exchangeAuth, traceId, policy, origin, contentLength);

final HttpBeginExFW beginEx = beginExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(httpTypeId)
Expand Down Expand Up @@ -5527,6 +5528,7 @@ private Http2Exchange(
long requestId,
int streamId,
long authorization,
long traceId,
HttpPolicyConfig policy,
String origin,
long requestContentLength)
Expand All @@ -5540,7 +5542,7 @@ private Http2Exchange(
this.requestContentLength = requestContentLength;
this.requestId = requestId == NO_REQUEST_ID ? supplyInitialId.applyAsLong(routedId) : requestId;
this.responseId = supplyReplyId.applyAsLong(this.requestId);
this.expiringId = expireIfNecessary(guard, sessionId, originId, routedId, replyId, streamId);
this.expiringId = expireIfNecessary(guard, sessionId, originId, routedId, replyId, traceId, streamId);
}

private int initialWindow()
Expand Down Expand Up @@ -5974,12 +5976,12 @@ private void onResponseExpiring(
else if (canChallenge(requestCaps) && guard.challenge(sessionId, now))
{
doResponseChallenge(traceId);
expiringId = signaler.signalAt(expiresAt, originId, routedId, replyId, EXPIRING_SIGNAL, streamId);
expiringId = signaler.signalAt(expiresAt, originId, routedId, replyId, traceId, EXPIRING_SIGNAL, streamId);
}
else
{
final long expiringAt = guard.expiringAt(sessionId);
expiringId = signaler.signalAt(expiringAt, originId, routedId, replyId, EXPIRING_SIGNAL, streamId);
expiringId = signaler.signalAt(expiringAt, originId, routedId, replyId, traceId, EXPIRING_SIGNAL, streamId);
}
}

Expand Down Expand Up @@ -6804,6 +6806,7 @@ private long expireIfNecessary(
long originId,
long routedId,
long streamId,
long traceId,
int contextId)
{
long expiringId = NO_CANCEL_ID;
Expand All @@ -6812,7 +6815,8 @@ private long expireIfNecessary(
final long expiringAt = guard.expiringAt(sessionId);
if (expiringAt != EXPIRES_NEVER)
{
expiringId = signaler.signalAt(expiringAt, originId, routedId, streamId, EXPIRING_SIGNAL, contextId);
expiringId = signaler.signalAt(expiringAt, originId, routedId, streamId, traceId,
EXPIRING_SIGNAL, contextId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ private void onClientFanoutReplyFlush(

// defer reply window credit until next tick
assert reserved == SIZE_OF_FLUSH_WITH_EXTENSION;
signaler.signalNow(originId, routedId, initialId, SIGNAL_FANOUT_REPLY_WINDOW, 0);
signaler.signalNow(originId, routedId, initialId, traceId, SIGNAL_FANOUT_REPLY_WINDOW, 0);
}

private void onClientFanoutReplyEnd(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ private void onClientFanMemberClosed(
else
{
this.groupCleanupId = doClientFanoutInitialSignalAt(currentTimeMillis() + SECONDS.toMillis(cleanupDelay),
SIGNAL_GROUP_CLEANUP);
traceId, SIGNAL_GROUP_CLEANUP);
}
}
}
Expand Down Expand Up @@ -750,7 +750,7 @@ private void onClientInitialData(

if ((flags & FLAGS_INCOMPLETE) != 0x00)
{
markEntryDirty(stream.partitionOffset);
markEntryDirty(traceId, stream.partitionOffset);
}

if (error != NO_ERROR)
Expand Down Expand Up @@ -1042,7 +1042,7 @@ private void ackOffsetHighWatermark(
{
while (lastAckOffsetHighWatermark <= partitionOffset)
{
final KafkaCacheEntryFW entry = markEntryDirty(lastAckOffsetHighWatermark);
final KafkaCacheEntryFW entry = markEntryDirty(traceId, lastAckOffsetHighWatermark);
final long memberStreamId = entry.ownerId();
final KafkaCacheClientProduceStream member = members.get(memberStreamId);
if (member != null)
Expand All @@ -1061,6 +1061,7 @@ private void ackOffsetHighWatermark(
}

private KafkaCacheEntryFW markEntryDirty(
long traceId,
long partitionOffset)
{
final KafkaCachePartition.Node node = this.partition.seekNotAfter(partitionOffset);
Expand All @@ -1079,7 +1080,7 @@ private KafkaCacheEntryFW markEntryDirty(
if (compactId == NO_CANCEL_ID)
{
this.compactAt = newCompactAt;
this.compactId = doClientFanoutInitialSignalAt(newCompactAt, SIGNAL_SEGMENT_COMPACT);
this.compactId = doClientFanoutInitialSignalAt(newCompactAt, traceId, SIGNAL_SEGMENT_COMPACT);
}
}

Expand Down Expand Up @@ -1139,9 +1140,10 @@ private void doClientFanReplyWindow(

private long doClientFanoutInitialSignalAt(
long timeMillis,
long traceId,
int signalId)
{
return signaler.signalAt(timeMillis, originId, routedId, initialId, signalId, 0);
return signaler.signalAt(timeMillis, originId, routedId, initialId, traceId, signalId, 0);
}
}

Expand Down Expand Up @@ -1344,7 +1346,7 @@ private void onClientInitialAbort(

if (partitionOffset != DEFAULT_LATEST_OFFSET && dataFlags != FLAGS_FIN)
{
fan.markEntryDirty(partitionOffset);
fan.markEntryDirty(traceId, partitionOffset);
}

fan.onClientFanMemberClosed(traceId, this);
Expand Down
Loading