diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index ccead21dcd..ff52583221 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -842,10 +842,15 @@ private CompletableFuture streamAsync(RaftClientRequest request newExceptionReply(request, generateNotLeaderException()))); } - private CompletableFuture streamEndOfRequestAsync(RaftClientRequest request) { + CompletableFuture streamEndOfRequestAsync(RaftClientRequest request) { return role.getLeaderState() - .map(ls -> ls.streamEndOfRequestAsync(request)) - .orElse(null); + .map(ls -> ls.streamEndOfRequestAsync(request)) + .orElseGet(() -> { + final CompletableFuture errorF = new CompletableFuture<>(); + errorF.completeExceptionally( + new Exception("Unexpected null encountered, while receiving end of stream request.")); + return errorF; + }); } CompletableFuture processQueryFuture( @@ -1560,21 +1565,28 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( return reply; } - Optional leaderPeerInfo = null; + RaftPeerProto leaderPeerInfo = null; if (request.hasLastRaftConfigurationLogEntryProto()) { List peerList = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry() .getPeersList(); - leaderPeerInfo = peerList.stream().filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst(); - Preconditions.assertTrue(leaderPeerInfo.isPresent()); + Optional optionalLeaderPeerInfo = peerList.stream() + .filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst(); + leaderPeerInfo = (optionalLeaderPeerInfo.isPresent()) ? optionalLeaderPeerInfo.get() : null; } // For the cases where RaftConf is empty on newly started peer with // empty peer list, we retrieve leader info from // installSnapShotRequestProto. - RoleInfoProto roleInfoProto = - getRaftConf().getPeer(state.getLeaderId()) == null ? - getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo.get())) : - getRoleInfoProto(); + RoleInfoProto roleInfoProto; + RaftPeer raftPeer = getRaftConf().getPeer(state.getLeaderId()); + if (raftPeer == null && leaderPeerInfo != null) { + roleInfoProto = getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo)); + } else if (raftPeer == null) { + throw new IOException("Leader peer info is unknown."); + } else { + roleInfoProto = getRoleInfoProto(); + } + // This is the first installSnapshot notify request for this term and // index. Notify the state machine to install the snapshot. LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.", @@ -1633,7 +1645,7 @@ private CompletableFuture replyPendingRequest( // update the retry cache final CacheEntry cacheEntry = retryCache.getOrCreateEntry(invocationId); if (getInfo().isLeader()) { - Preconditions.assertTrue(cacheEntry != null && !cacheEntry.isCompletedNormally(), + Preconditions.assertTrue(!cacheEntry.isCompletedNormally(), "retry cache entry should be pending: %s", cacheEntry); } if (cacheEntry.isFailed()) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java index 438315ed7a..07f427fc1d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java @@ -181,13 +181,11 @@ public String toString() { } CacheEntry getOrCreateEntry(ClientInvocationId key) { - final CacheEntry entry; try { - entry = cache.get(key, () -> new CacheEntry(key)); + return cache.get(key, () -> new CacheEntry(key)); } catch (ExecutionException e) { throw new IllegalStateException(e); } - return entry; } CacheEntry refreshEntry(CacheEntry newEntry) { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index f7303f9327..4bdba0d12f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -20,10 +20,7 @@ import org.apache.log4j.Level; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.protocol.RaftGroupMemberId; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.*; import org.apache.ratis.server.DataStreamMap; import org.apache.ratis.server.DataStreamServer; import org.apache.ratis.server.DivisionInfo; @@ -44,6 +41,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; public class RaftServerTestUtil { @@ -156,6 +154,10 @@ public static DataStreamMap newDataStreamMap(Object name) { return new DataStreamMapImpl(name); } + public static CompletableFuture streamEndOfRequestAsync(RaftServer.Division server, RaftClientRequest request) { + return ((RaftServerImpl)server).streamEndOfRequestAsync(request); + } + public static void assertLostMajorityHeartbeatsRecently(RaftServer.Division leader) { final FollowerState f = ((RaftServerImpl)leader).getRole().getFollowerState().orElse(null); Assert.assertNotNull(f); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java index 9ab814cc66..795fd380ba 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java @@ -59,11 +59,15 @@ public static void assertFailure(RetryCache cache, LogEntryProto logEntry, boole } } - public static void getOrCreateEntry(RaftServer.Division server, ClientInvocationId invocationId) { - getOrCreateEntry(server.getRetryCache(), invocationId); + public static RetryCache.Entry getOrCreateEntry(RaftServer.Division server, ClientInvocationId invocationId) { + return getOrCreateEntryImpl(server.getRetryCache(), invocationId); } - private static RetryCache.Entry getOrCreateEntry(RetryCache cache, ClientInvocationId invocationId) { + public static RetryCache.Entry getOrCreateEntry(RetryCache retryCache, ClientInvocationId invocationId) { + return getOrCreateEntryImpl(retryCache, invocationId); + } + + private static RetryCache.Entry getOrCreateEntryImpl(RetryCache cache, ClientInvocationId invocationId) { return ((RetryCacheImpl)cache).getOrCreateEntry(invocationId); } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java index b17030956d..b2a6622583 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java @@ -30,7 +30,8 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.log4j.Level; import org.apache.ratis.BaseTest; -import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.*; +import org.apache.ratis.protocol.exceptions.StreamException; import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleMessage; @@ -44,9 +45,6 @@ import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.exceptions.AlreadyClosedException; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.exceptions.TimeoutIOException; import org.apache.ratis.retry.RetryPolicies; import org.apache.ratis.server.RaftServer; @@ -74,6 +72,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet { { @@ -311,9 +310,50 @@ void testRaftClientRequestMetrics(MiniRaftClusterWithGrpc cluster) throws IOExce } } + @Test + public void TestStreamEndOfRequestAsync() throws Exception { + runWithNewCluster(1, this::runTestStreamEndOfRequestAsync); + } + + void runTestStreamEndOfRequestAsync(MiniRaftClusterWithGrpc cluster) throws Exception { + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = leader.getId(); + final RaftGroupId leaderGroupId = leader.getGroup().getGroupId(); + final RaftClient client = cluster.createClient(); + final AtomicLong seqNum = new AtomicLong(); + final RaftClientRequest clientRequest = newRaftClientRequest(client, leaderId, seqNum.incrementAndGet(), + RaftClientRequest.messageStreamRequestType(12, 12, true)); + + // Leader completes exceptionally, because there is no such stream + // Creating realistic stream is complex, since streams are created by clients, but + // this function tests server functionality. + CompletableFuture fRequest = RaftServerTestUtil.streamEndOfRequestAsync(leader, clientRequest); + Assert.assertNotNull(fRequest); + Assert.assertTrue(fRequest.isCompletedExceptionally()); + fRequest.exceptionally(e -> { + Assert.assertSame(e.getCause().getClass(), StreamException.class); + return clientRequest; + }); + + // On non leader, request should fail because only leaders handle this kind of requests + RaftServer server = cluster.putNewServer(RaftPeerId.getRaftPeerId("Server 21"), leader.getGroup(), false); + fRequest = RaftServerTestUtil.streamEndOfRequestAsync(server.getDivision(leaderGroupId), clientRequest); + Assert.assertNotNull(fRequest); + Assert.assertTrue(fRequest.isCompletedExceptionally()); + fRequest.exceptionally(e -> { + Assert.assertSame(e.getCause().getClass(), Exception.class); + return clientRequest; + }); + } + static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId serverId, long seqNum) { + return newRaftClientRequest(client, serverId, seqNum, RaftClientRequest.writeRequestType()); + } + + static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId serverId, long seqNum, + RaftClientRequest.Type type) { final SimpleMessage m = new SimpleMessage("m" + seqNum); return RaftClientTestUtil.newRaftClientRequest(client, serverId, seqNum, m, - RaftClientRequest.writeRequestType(), ProtoUtils.toSlidingWindowEntry(seqNum, seqNum == 1L)); + type, ProtoUtils.toSlidingWindowEntry(seqNum, seqNum == 1L)); } } diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java index e1fff149b5..5ccf2d4b3c 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java @@ -24,9 +24,7 @@ import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.metrics.RatisMetricRegistry; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.protocol.RaftGroupMemberId; -import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.*; import org.apache.ratis.protocol.exceptions.TimeoutIOException; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RetryCacheTestUtil; @@ -636,6 +634,29 @@ public void notifyLogFailed(Throwable cause, LogEntryProto entry) { throw ex; } + /** + * Verifies that getOrCreateEntry function creates cache entry in every case and does not return null. + */ + @Test + public void testGetOrCreateEntry() { + final RetryCache retryCache = RetryCacheTestUtil.createRetryCache(); + final ClientId clientId = ClientId.randomId(); + final long invocationId1 = 123456789; + final ClientInvocationId clientInvocationId1 = ClientInvocationId.valueOf(clientId, invocationId1); + RetryCache.Entry cacheEntry1 = RetryCacheTestUtil.getOrCreateEntry(retryCache, clientInvocationId1); + Assert.assertNotNull(cacheEntry1); + + RetryCache.Entry cacheEntry1Again = RetryCacheTestUtil.getOrCreateEntry(retryCache, clientInvocationId1); + Assert.assertEquals(cacheEntry1.toString(), cacheEntry1Again.toString()); + + final long invocationId2 = 987654321; + final ClientInvocationId clientInvocationId2 = ClientInvocationId.valueOf(clientId, invocationId2); + RetryCache.Entry cacheEntry2 = RetryCacheTestUtil.getOrCreateEntry(retryCache, clientInvocationId2); + Assert.assertNotNull(cacheEntry2); + + Assert.assertNotEquals(cacheEntry1.toString(), cacheEntry2.toString()); + } + static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) { final Thread t = new Thread(() -> { try {