* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
-import akka.testkit.JavaTestKit;
+import akka.protobuf.ByteString;
import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
import org.junit.Test;
+import org.opendaylight.controller.cluster.messaging.MessageSlice;
+import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
@Override
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
if (leader != null) {
leader.close();
}
}
@Test
- public void testHandleMessageForUnknownMessage() throws Exception {
+ public void testHandleMessageForUnknownMessage() {
logStart("testHandleMessageForUnknownMessage");
leader = new Leader(createActorContext());
}
@Test
- public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
+ public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
MockRaftActorContext actorContext = createActorContextWithFollower();
}
- private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
+ private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
return sendReplicate(actorContext, 1, index);
}
- private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
- MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+ private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
+ final long index) {
+ return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
+ }
+
+ private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
+ final Payload payload) {
SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
actorContext.getReplicatedLog().append(newEntry);
return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
}
@Test
- public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
+ public void testHandleReplicateMessageSendAppendEntriesToFollower() {
logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
MockRaftActorContext actorContext = createActorContextWithFollower();
}
@Test
- public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
+ public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
MockRaftActorContext actorContext = createActorContextWithFollower();
}
@Test
- public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
+ public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
MockRaftActorContext actorContext = createActorContextWithFollower();
}
@Test
- public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
+ public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
MockRaftActorContext actorContext = createActorContextWithFollower();
}
@Test
- public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
+ public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
MockRaftActorContext actorContext = createActorContextWithFollower();
sendReplicate(actorContext, lastIndex + i + 1);
leader.handleMessage(followerActor, new AppendEntriesReply(
FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
-
}
- for (int i = 3; i < 5; i++) {
- sendReplicate(actorContext, lastIndex + i + 1);
+ // We are expecting six messages here -- a request to replicate and a consensus-reached message
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
+ for (int i = 0; i < 3; i++) {
+ assertRequestEntry(lastIndex, allMessages, i);
+ assertCommitEntry(lastIndex, allMessages, i);
}
- List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
- // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
- // get sent to the follower - but not the 5th
- assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+ // Now perform another commit, eliciting a request to persist
+ sendReplicate(actorContext, lastIndex + 3 + 1);
+ allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // This elicits another message for request to replicate
+ assertEquals("The number of request entries collected", 7, allMessages.size());
+ assertRequestEntry(lastIndex, allMessages, 3);
- for (int i = 0; i < 4; i++) {
- long expected = allMessages.get(i).getEntries().get(0).getIndex();
- assertEquals(expected, i + 2);
- }
+ sendReplicate(actorContext, lastIndex + 4 + 1);
+ allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of request entries collected", 7, allMessages.size());
+ }
+
+ private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
+ final int messageNr) {
+ final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
+ assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
+ assertEquals(ImmutableList.of(), commitReq.getEntries());
+ }
+
+ private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
+ final int messageNr) {
+ final AppendEntries req = allMessages.get(2 * messageNr);
+ assertEquals(lastIndex + messageNr, req.getLeaderCommit());
+
+ final List<ReplicatedLogEntry> entries = req.getEntries();
+ assertEquals(1, entries.size());
+ assertEquals(messageNr + 2, entries.get(0).getIndex());
}
@Test
- public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
+ public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
MockRaftActorContext actorContext = createActorContextWithFollower();
}
@Test
- public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
+ public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
MockRaftActorContext actorContext = createActorContextWithFollower();
}
@Test
- public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+ public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
MockRaftActorContext actorContext = createActorContextWithFollower();
@Test
- public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
+ public void testHandleReplicateMessageWhenThereAreNoFollowers() {
logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
MockRaftActorContext actorContext = createActorContext();
leaderActor, ApplyState.class);
assertEquals("ApplyState count", newLogIndex, applyStateList.size());
- for (int i = 0; i <= newLogIndex - 1; i++ ) {
+ for (int i = 0; i <= newLogIndex - 1; i++) {
ApplyState applyState = applyStateList.get(i);
assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
leader.markFollowerActive(FOLLOWER_ID);
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+ leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
- fts.setSnapshotBytes(bs);
+ fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
//send first chunk and no InstallSnapshotReply received yet
}
@Test
- public void testSendAppendEntriesSnapshotScenario() throws Exception {
+ public void testSendAppendEntriesSnapshotScenario() {
logStart("testSendAppendEntriesSnapshotScenario");
final MockRaftActorContext actorContext = createActorContextWithFollower();
}
@Test
- public void testInitiateInstallSnapshot() throws Exception {
+ public void testInitiateInstallSnapshot() {
logStart("testInitiateInstallSnapshot");
MockRaftActorContext actorContext = createActorContextWithFollower();
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// set the snapshot as absent and check if capture-snapshot is invoked.
- leader.setSnapshot(null);
+ leader.setSnapshotHolder(null);
// new entry
SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
- assertTrue(cs.isInstallSnapshotInitiated());
assertEquals(3, cs.getLastAppliedIndex());
assertEquals(1, cs.getLastAppliedTerm());
assertEquals(4, cs.getLastIndex());
actorContext.getReplicatedLog().removeFrom(0);
+ AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
+ actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
+
leader = new Leader(actorContext);
actorContext.setCurrentBehavior(leader);
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// set the snapshot as absent and check if capture-snapshot is invoked.
- leader.setSnapshot(null);
+ leader.setSnapshotHolder(null);
for (int i = 0; i < 4; i++) {
actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
// Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
// installed with a SendInstallSnapshot
- leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+ RaftVersions.CURRENT_VERSION));
assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
-
- assertTrue(cs.isInstallSnapshotInitiated());
assertEquals(3, cs.getLastAppliedIndex());
assertEquals(1, cs.getLastAppliedTerm());
assertEquals(4, cs.getLastIndex());
assertEquals(2, cs.getLastTerm());
- // if an initiate is started again when first is in progress, it should not initiate Capture
+ assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
+ assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
+
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Sending Replicate message should not initiate another capture since the first is in progress.
leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+ assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+ // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+ RaftVersions.CURRENT_VERSION));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+
+ // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
+ final byte[] bytes = new byte[]{1, 2, 3};
+ installSnapshotStream.get().get().write(bytes);
+ actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
+ Runtime.getRuntime().totalMemory());
+ MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+ // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
+ MessageCollectorActor.clearMessages(followerActor);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+ RaftVersions.CURRENT_VERSION));
+ MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
}
@Test
- public void testInstallSnapshot() throws Exception {
+ public void testInstallSnapshot() {
logStart("testInstallSnapshot");
final MockRaftActorContext actorContext = createActorContextWithFollower();
leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
leader.getFollower(FOLLOWER_ID).setNextIndex(0);
- Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
- Collections.<ReplicatedLogEntry>emptyList(),
- lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+ byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+ Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+ lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+ new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
assertTrue(raftBehavior instanceof Leader);
}
@Test
- public void testForceInstallSnapshot() throws Exception {
+ public void testForceInstallSnapshot() {
logStart("testForceInstallSnapshot");
final MockRaftActorContext actorContext = createActorContextWithFollower();
leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
- Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
- Collections.<ReplicatedLogEntry>emptyList(),
- lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+ byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+ Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+ lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+ new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
assertTrue(raftBehavior instanceof Leader);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+ leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
- fts.setSnapshotBytes(bs);
+ fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
while (!fts.isLastChunk(fts.getChunkIndex())) {
fts.getNextChunk();
}
@Test
- public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+ public void testSendSnapshotfromInstallSnapshotReply() {
logStart("testSendSnapshotfromInstallSnapshotReply");
MockRaftActorContext actorContext = createActorContextWithFollower();
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm);
- leader.setSnapshot(snapshot);
+ Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
InstallSnapshot.class);
@Test
- public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
MockRaftActorContext actorContext = createActorContextWithFollower();
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm);
- leader.setSnapshot(snapshot);
+ Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
InstallSnapshot.class);
}
@Test
- public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+ public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
MockRaftActorContext actorContext = createActorContextWithFollower();
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm);
- leader.setSnapshot(snapshot);
+ Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
InstallSnapshot.class);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
- installSnapshot.getLastChunkHashCode().get().intValue());
+ installSnapshot.getLastChunkHashCode().getAsInt());
final int hashCode = Arrays.hashCode(installSnapshot.getData());
assertEquals(2, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
+ assertEquals(hashCode, installSnapshot.getLastChunkHashCode().getAsInt());
}
@Test
- public void testLeaderInstallSnapshotState() {
+ public void testLeaderInstallSnapshotState() throws IOException {
logStart("testLeaderInstallSnapshotState");
Map<String, String> leadersSnapshot = new HashMap<>();
byte[] barray = bs.toByteArray();
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
- fts.setSnapshotBytes(bs);
+ fts.setSnapshotBytes(ByteSource.wrap(barray));
assertEquals(bs.size(), barray.length);
}
assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
+ fts.close();
}
@Override
}
@Override
- protected MockRaftActorContext createActorContext(ActorRef actorRef) {
+ protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
return createActorContext(LEADER_ID, actorRef);
}
- private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
+ private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(100000);
}
@Test
- public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
+ public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
}
@Test
- public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
+ public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
final MockRaftActorContext leaderActorContext = createActorContext();
}
@Test
- public void testHandleAppendEntriesReplySuccess() throws Exception {
+ public void testHandleAppendEntriesReplySuccess() {
logStart("testHandleAppendEntriesReplySuccess");
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
assertTrue("Expected Leader", newBehavior instanceof Leader);
}
- private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
+ private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
// kill 1 follower and verify if that got killed
- final JavaTestKit probe = new JavaTestKit(getSystem());
+ final TestKit probe = new TestKit(getSystem());
probe.watch(followerActor1);
followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
}
@Test
- public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+ public void testIsolatedLeaderCheckTwoFollowers() {
logStart("testIsolatedLeaderCheckTwoFollowers");
RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
}
@Test
- public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
+ public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
}
@Test
- public void testLaggingFollowerStarvation() throws Exception {
+ public void testLaggingFollowerStarvation() {
logStart("testLaggingFollowerStarvation");
String leaderActorId = actorFactory.generateActorId("leader");
leaderActorContext.setLastApplied(-1);
String nonVotingFollowerId = "nonvoting-follower";
- TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
- Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
+ ActorRef nonVotingFollowerActor = actorFactory.createActor(
+ MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
VotingState.NON_VOTING);
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
+ doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
MessageCollectorActor.clearMessages(followerActor);
RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
MessageCollectorActor.clearMessages(followerActor);
RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
}
+ @Test
+ public void testReplicationWithPayloadSizeThatExceedsThreshold() {
+ logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
+
+ final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
+ Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
+ new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
+ final MockRaftActorContext.MockPayload largePayload =
+ new MockRaftActorContext.MockPayload("large", serializedSize);
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(300, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Send initial heartbeat reply so follower is marked active
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send normal payload first to prime commit index.
+ final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+ sendReplicate(leaderActorContext, term, 0);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
+ assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
+ sendReplicate(leaderActorContext, term, 1, largePayload);
+
+ MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
+ assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
+
+ final Identifier slicingId = messageSlice.getIdentifier();
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
+
+ leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
+ messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
+
+ leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
+
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send another normal payload.
+
+ sendReplicate(leaderActorContext, term, 2);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
+ }
+
+ @Test
+ public void testLargePayloadSlicingExpiration() {
+ logStart("testLargePayloadSlicingExpiration");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Send initial heartbeat reply so follower is marked active
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
+ leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
+ MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+
+ // Sleep for at least 3 * election timeout so the slicing state expires.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+
+ MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send an AppendEntriesReply - this should restart the slicing.
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ }
+
+ @Test
+ public void testLeaderAddressInAppendEntries() {
+ logStart("testLeaderAddressInAppendEntries");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ FiniteDuration.create(50, TimeUnit.MILLISECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
+ peerId -> leaderActor.path().toString());
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Initial heartbeat shouldn't have the leader address
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertFalse(appendEntries.getLeaderAddress().isPresent());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send AppendEntriesReply indicating the follower needs the leader address
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
+ RaftVersions.CURRENT_VERSION));
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertTrue(appendEntries.getLeaderAddress().isPresent());
+ assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send AppendEntriesReply indicating the follower does not need the leader address
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
+ RaftVersions.CURRENT_VERSION));
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertFalse(appendEntries.getLeaderAddress().isPresent());
+ }
+
@Override
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
- ActorRef actorRef, RaftRPC rpc) throws Exception {
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
+ final ActorRef actorRef, final RaftRPC rpc) {
super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
}
private final long electionTimeOutIntervalMillis;
private final int snapshotChunkSize;
- MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
- super();
+ MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
this.snapshotChunkSize = snapshotChunkSize;
}