import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
-import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
// Wait slightly longer than heartbeat duration
Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
for(int i=0;i<3;i++) {
Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
}
List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
followerActor.underlyingActor().clear();
Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
sendReplicate(actorContext, lastIndex+1);
List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
//InstallSnapshotReply received
fts.markSendStatus(true);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertEquals(3, installSnapshot.getTotalChunks());
assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
- int hashCode = installSnapshot.getData().hashCode();
+ int hashCode = Arrays.hashCode(installSnapshot.getData());
followerActor.underlyingActor().clear();
j = barray.length;
}
- ByteString chunk = fts.getNextChunk();
- assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+ byte[] chunk = fts.getNextChunk();
+ assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
fts.markSendStatus(true);
Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leader = new Leader(leaderActorContext);
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+
assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
+ assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
short payloadVersion = 5;
AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
- FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals(2, followerInfo.getMatchIndex());
+ assertEquals(3, followerInfo.getNextIndex());
assertEquals(payloadVersion, followerInfo.getPayloadVersion());
+ assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
}
@Test
MockRaftActorContext leaderActorContext = createActorContext();
leader = new Leader(leaderActorContext);
- RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
Assert.assertTrue(behavior instanceof Leader);
}
leader.markFollowerActive("follower-1");
leader.markFollowerActive("follower-2");
- RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
Assert.assertTrue("Behavior not instance of Leader when all followers are active",
behavior instanceof Leader);
leader.markFollowerInActive("follower-1");
leader.markFollowerActive("follower-2");
- behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
behavior instanceof Leader);
assertEquals(termMsg2.getActor(), followerActor2);
leader.markFollowerInActive("follower-2");
- return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
}
@Test
MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
// Send reply from the voting follower and verify consensus.
- leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
}
Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
}
verify(mockTransferCohort).abortTransfer();