Skip to content

Commit

Permalink
Fix server impl NPEs
Browse files Browse the repository at this point in the history
- Add default to streamEndOfRequestAsync function
- Throw error on notifyStateMachine... funtion
- Remove possibility for new cache entries to be null
- Add tests for changes
  • Loading branch information
softgitron committed Apr 24, 2021
1 parent 43915d2 commit dd9bdf9
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -842,10 +842,15 @@ private CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request
newExceptionReply(request, generateNotLeaderException())));
}

private CompletableFuture<RaftClientRequest> streamEndOfRequestAsync(RaftClientRequest request) {
CompletableFuture<RaftClientRequest> streamEndOfRequestAsync(RaftClientRequest request) {
return role.getLeaderState()
.map(ls -> ls.streamEndOfRequestAsync(request))
.orElse(null);
.map(ls -> ls.streamEndOfRequestAsync(request))
.orElseGet(() -> {
final CompletableFuture<RaftClientRequest> errorF = new CompletableFuture<>();
errorF.completeExceptionally(
new Exception("Unexpected null encountered, while receiving end of stream request."));
return errorF;
});
}

CompletableFuture<RaftClientReply> processQueryFuture(
Expand Down Expand Up @@ -1560,21 +1565,28 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
return reply;
}

Optional<RaftPeerProto> leaderPeerInfo = null;
RaftPeerProto leaderPeerInfo = null;
if (request.hasLastRaftConfigurationLogEntryProto()) {
List<RaftPeerProto> peerList = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry()
.getPeersList();
leaderPeerInfo = peerList.stream().filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst();
Preconditions.assertTrue(leaderPeerInfo.isPresent());
Optional<RaftPeerProto> optionalLeaderPeerInfo = peerList.stream()
.filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst();
leaderPeerInfo = (optionalLeaderPeerInfo.isPresent()) ? optionalLeaderPeerInfo.get() : null;
}

// For the cases where RaftConf is empty on newly started peer with
// empty peer list, we retrieve leader info from
// installSnapShotRequestProto.
RoleInfoProto roleInfoProto =
getRaftConf().getPeer(state.getLeaderId()) == null ?
getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo.get())) :
getRoleInfoProto();
RoleInfoProto roleInfoProto;
RaftPeer raftPeer = getRaftConf().getPeer(state.getLeaderId());
if (raftPeer == null && leaderPeerInfo != null) {
roleInfoProto = getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo));
} else if (raftPeer == null) {
throw new IOException("Leader peer info is unknown.");
} else {
roleInfoProto = getRoleInfoProto();
}

// This is the first installSnapshot notify request for this term and
// index. Notify the state machine to install the snapshot.
LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.",
Expand Down Expand Up @@ -1633,7 +1645,7 @@ private CompletableFuture<Message> replyPendingRequest(
// update the retry cache
final CacheEntry cacheEntry = retryCache.getOrCreateEntry(invocationId);
if (getInfo().isLeader()) {
Preconditions.assertTrue(cacheEntry != null && !cacheEntry.isCompletedNormally(),
Preconditions.assertTrue(!cacheEntry.isCompletedNormally(),
"retry cache entry should be pending: %s", cacheEntry);
}
if (cacheEntry.isFailed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,11 @@ public String toString() {
}

CacheEntry getOrCreateEntry(ClientInvocationId key) {
final CacheEntry entry;
try {
entry = cache.get(key, () -> new CacheEntry(key));
return cache.get(key, () -> new CacheEntry(key));
} catch (ExecutionException e) {
throw new IllegalStateException(e);
}
return entry;
}

CacheEntry refreshEntry(CacheEntry newEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import org.apache.log4j.Level;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.DataStreamServer;
import org.apache.ratis.server.DivisionInfo;
Expand All @@ -44,6 +41,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

public class RaftServerTestUtil {
Expand Down Expand Up @@ -156,6 +154,10 @@ public static DataStreamMap newDataStreamMap(Object name) {
return new DataStreamMapImpl(name);
}

public static CompletableFuture<RaftClientRequest> streamEndOfRequestAsync(RaftServer.Division server, RaftClientRequest request) {
return ((RaftServerImpl)server).streamEndOfRequestAsync(request);
}

public static void assertLostMajorityHeartbeatsRecently(RaftServer.Division leader) {
final FollowerState f = ((RaftServerImpl)leader).getRole().getFollowerState().orElse(null);
Assert.assertNotNull(f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,15 @@ public static void assertFailure(RetryCache cache, LogEntryProto logEntry, boole
}
}

public static void getOrCreateEntry(RaftServer.Division server, ClientInvocationId invocationId) {
getOrCreateEntry(server.getRetryCache(), invocationId);
public static RetryCache.Entry getOrCreateEntry(RaftServer.Division server, ClientInvocationId invocationId) {
return getOrCreateEntryImpl(server.getRetryCache(), invocationId);
}

private static RetryCache.Entry getOrCreateEntry(RetryCache cache, ClientInvocationId invocationId) {
public static RetryCache.Entry getOrCreateEntry(RetryCache retryCache, ClientInvocationId invocationId) {
return getOrCreateEntryImpl(retryCache, invocationId);
}

private static RetryCache.Entry getOrCreateEntryImpl(RetryCache cache, ClientInvocationId invocationId) {
return ((RetryCacheImpl)cache).getOrCreateEntry(invocationId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.StreamException;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
Expand All @@ -44,9 +45,6 @@
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.server.RaftServer;
Expand Down Expand Up @@ -74,6 +72,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet {
{
Expand Down Expand Up @@ -311,9 +310,50 @@ void testRaftClientRequestMetrics(MiniRaftClusterWithGrpc cluster) throws IOExce
}
}

@Test
public void TestStreamEndOfRequestAsync() throws Exception {
runWithNewCluster(1, this::runTestStreamEndOfRequestAsync);
}

void runTestStreamEndOfRequestAsync(MiniRaftClusterWithGrpc cluster) throws Exception {
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = leader.getId();
final RaftGroupId leaderGroupId = leader.getGroup().getGroupId();
final RaftClient client = cluster.createClient();
final AtomicLong seqNum = new AtomicLong();
final RaftClientRequest clientRequest = newRaftClientRequest(client, leaderId, seqNum.incrementAndGet(),
RaftClientRequest.messageStreamRequestType(12, 12, true));

// Leader completes exceptionally, because there is no such stream
// Creating realistic stream is complex, since streams are created by clients, but
// this function tests server functionality.
CompletableFuture<RaftClientRequest> fRequest = RaftServerTestUtil.streamEndOfRequestAsync(leader, clientRequest);
Assert.assertNotNull(fRequest);
Assert.assertTrue(fRequest.isCompletedExceptionally());
fRequest.exceptionally(e -> {
Assert.assertSame(e.getCause().getClass(), StreamException.class);
return clientRequest;
});

// On non leader, request should fail because only leaders handle this kind of requests
RaftServer server = cluster.putNewServer(RaftPeerId.getRaftPeerId("Server 21"), leader.getGroup(), false);
fRequest = RaftServerTestUtil.streamEndOfRequestAsync(server.getDivision(leaderGroupId), clientRequest);
Assert.assertNotNull(fRequest);
Assert.assertTrue(fRequest.isCompletedExceptionally());
fRequest.exceptionally(e -> {
Assert.assertSame(e.getCause().getClass(), Exception.class);
return clientRequest;
});
}

static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId serverId, long seqNum) {
return newRaftClientRequest(client, serverId, seqNum, RaftClientRequest.writeRequestType());
}

static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId serverId, long seqNum,
RaftClientRequest.Type type) {
final SimpleMessage m = new SimpleMessage("m" + seqNum);
return RaftClientTestUtil.newRaftClientRequest(client, serverId, seqNum, m,
RaftClientRequest.writeRequestType(), ProtoUtils.toSlidingWindowEntry(seqNum, seqNum == 1L));
type, ProtoUtils.toSlidingWindowEntry(seqNum, seqNum == 1L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import org.apache.ratis.RaftTestUtil.SimpleOperation;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
Expand Down Expand Up @@ -636,6 +634,29 @@ public void notifyLogFailed(Throwable cause, LogEntryProto entry) {
throw ex;
}

/**
* Verifies that getOrCreateEntry function creates cache entry in every case and does not return null.
*/
@Test
public void testGetOrCreateEntry() {
final RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
final ClientId clientId = ClientId.randomId();
final long invocationId1 = 123456789;
final ClientInvocationId clientInvocationId1 = ClientInvocationId.valueOf(clientId, invocationId1);
RetryCache.Entry cacheEntry1 = RetryCacheTestUtil.getOrCreateEntry(retryCache, clientInvocationId1);
Assert.assertNotNull(cacheEntry1);

RetryCache.Entry cacheEntry1Again = RetryCacheTestUtil.getOrCreateEntry(retryCache, clientInvocationId1);
Assert.assertEquals(cacheEntry1.toString(), cacheEntry1Again.toString());

final long invocationId2 = 987654321;
final ClientInvocationId clientInvocationId2 = ClientInvocationId.valueOf(clientId, invocationId2);
RetryCache.Entry cacheEntry2 = RetryCacheTestUtil.getOrCreateEntry(retryCache, clientInvocationId2);
Assert.assertNotNull(cacheEntry2);

Assert.assertNotEquals(cacheEntry1.toString(), cacheEntry2.toString());
}

static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) {
final Thread t = new Thread(() -> {
try {
Expand Down

0 comments on commit dd9bdf9

Please sign in to comment.