import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import akka.actor.Props;
import akka.testkit.TestActorRef;
import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import scala.concurrent.duration.FiniteDuration;
MockRaftActorContext actorContext = createActorContext();
follower = new Follower(actorContext);
- MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
+ MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
}
@Test
- public void testHandleElectionTimeout(){
- logStart("testHandleElectionTimeout");
+ public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
+ logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
- follower = new Follower(createActorContext());
+ MockRaftActorContext context = createActorContext();
+ follower = new Follower(context);
- RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, ElectionTimeout.INSTANCE);
+ Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+ RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
assertTrue(raftBehavior instanceof Candidate);
}
@Test
- public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
+ public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
+ logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
+
+ MockRaftActorContext context = createActorContext();
+ ((DefaultConfigParamsImpl) context.getConfigParams()).
+ setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
+
+ follower = new Follower(context);
+ context.setCurrentBehavior(follower);
+
+ Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
+ getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+ -1, -1, (short) 1));
+
+ Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
+ RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
+ assertTrue(raftBehavior instanceof Follower);
+
+ Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
+ getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+ -1, -1, (short) 1));
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
+ assertTrue(raftBehavior instanceof Follower);
+ }
+
+ @Test
+ public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
MockRaftActorContext context = createActorContext();
int offset = 0;
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
- int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+ int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
int lastIncludedIndex = 1;
int chunkIndex = 1;
InstallSnapshot lastInstallSnapshot = null;
ByteString bsSnapshot = createSnapshot();
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
- int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+ int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
int lastIncludedIndex = 1;
// Check that snapshot installation is not in progress
assertNotNull(follower.getSnapshotTracker());
// Send an append entry
- AppendEntries appendEntries = mock(AppendEntries.class);
- doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
+ AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
+ Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
follower.handleMessage(leaderActor, appendEntries);
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
- assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
- assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
- assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
+ assertEquals("isSuccess", true, reply.isSuccess());
+ assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
+ assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
+ assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
+
+ assertNotNull(follower.getSnapshotTracker());
+ }
+
+ @Test
+ public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
+ logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
+
+ MockRaftActorContext context = createActorContext();
+
+ follower = createBehavior(context);
+
+ ByteString bsSnapshot = createSnapshot();
+ int snapshotLength = bsSnapshot.size();
+ int chunkSize = 50;
+ int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
+ int lastIncludedIndex = 1;
+
+ // Check that snapshot installation is not in progress
+ assertNull(follower.getSnapshotTracker());
+
+ // Make sure that we have more than 1 chunk to send
+ assertTrue(totalChunks > 1);
+
+ // Send an install snapshot with the first chunk to start the process of installing a snapshot
+ byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+ follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+ chunkData, 1, totalChunks));
+
+ // Check if snapshot installation is in progress now
+ assertNotNull(follower.getSnapshotTracker());
- // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
- verify(appendEntries, never()).getPrevLogIndex();
+ // Send appendEntries with a new term and leader.
+ AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
+ Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
+ follower.handleMessage(leaderActor, appendEntries);
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertEquals("isSuccess", true, reply.isSuccess());
+ assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
+ assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
+ assertEquals("getTerm", 2, reply.getTerm());
+
+ assertNull(follower.getSnapshotTracker());
}
@Test
logStart("testInitialSyncUpWithHandleInstallSnapshot");
MockRaftActorContext context = createActorContext();
+ context.setCommitIndex(-1);
follower = createBehavior(context);
int offset = 0;
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
- int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+ int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
int lastIncludedIndex = 1;
int chunkIndex = 1;
InstallSnapshot lastInstallSnapshot = null;
follower = createBehavior(context);
- MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+ TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+
+ RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
+ assertTrue("Expected Candidate", newBehavior instanceof Candidate);
}
@Test
- public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
+ public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled(){
MockRaftActorContext context = createActorContext();
context.setConfigParams(new DefaultConfigParamsImpl(){
@Override
follower = createBehavior(context);
- MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
+ TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
+ RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
+ assertSame("handleMessage result", follower, newBehavior);
+ }
+
+ @Test
+ public void testFollowerSchedulesElectionIfNonVoting(){
+ MockRaftActorContext context = createActorContext();
+ context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
+ ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
+ FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
+
+ follower = new Follower(context, "leader", (short)1);
+
+ ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
+ ElectionTimeout.class);
+ RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
+ assertSame("handleMessage result", follower, newBehavior);
+ assertNull("Expected null leaderId", follower.getLeaderId());
}
@Test
if (chunkSize > snapshotLength) {
size = snapshotLength;
} else {
- if ((start + chunkSize) > snapshotLength) {
+ if (start + chunkSize > snapshotLength) {
size = snapshotLength - start;
}
}