package org.opendaylight.controller.cluster.raft.behaviors;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
import scala.concurrent.duration.FiniteDuration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
public class LeaderTest extends AbstractRaftActorBehaviorTest {
private final ActorRef leaderActor =
actorContext.setPeerAddresses(peerAddresses);
Leader leader = new Leader(actorContext);
+ leader.markFollowerActive(followerActor.path().toString());
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
leader.handleMessage(senderActor, new SendHeartBeat());
final String out =
actorContext.setPeerAddresses(peerAddresses);
Leader leader = new Leader(actorContext);
+ leader.markFollowerActive(followerActor.path().toString());
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
RaftActorBehavior raftBehavior = leader
.handleMessage(senderActor, new Replicate(null, null,
new MockRaftActorContext.MockReplicatedLogEntry(1,
leader.getFollowerToSnapshot().getNextChunk();
leader.getFollowerToSnapshot().incrementChunkIndex();
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
leader.handleMessage(leaderActor, new SendHeartBeat());
AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
//update follower timestamp
leader.markFollowerActive(followerActor.path().toString());
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
// this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
RaftActorBehavior raftBehavior = leader.handleMessage(
senderActor, new Replicate(null, "state-id", entry));
assertEquals(1, cs.getLastAppliedTerm());
assertEquals(4, cs.getLastIndex());
assertEquals(2, cs.getLastTerm());
+
+ // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+ raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
+ List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
+ assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
+
}};
}
assertEquals(snapshotIndex + 1, fli.getNextIndex());
}};
}
+ @Test
+ public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-reply",
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ };
+ configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ actorContext.setConfigParams(configParams);
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(1, objectList.size());
+
+ Object o = objectList.get(0);
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(2, objectList.size());
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(3, objectList.size());
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
+
+ // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ // Count should still stay at 3
+ assertEquals(3, objectList.size());
+ }};
+ }
+
@Test
- public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
new JavaTestKit(getSystem()) {{
TestActorRef<MessageCollectorActor> followerActor =
assertEquals(3, installSnapshot.getTotalChunks());
- leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ followerActor.path().toString(), -1, false));
+
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
leader.handleMessage(leaderActor, new SendHeartBeat());
- o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+ o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
{
TestActorRef<MessageCollectorActor> followerActor =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(followerActor.path().toString(),
leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
- leader.handleMessage(leaderActor, new SendHeartBeat());
-
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
o = MessageCollectorActor.getAllMessages(followerActor).get(1);
assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
Leader leader = new Leader(leaderActorContext);
leader.markFollowerActive(followerActor.path().toString());
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
leader.handleMessage(leaderActor, new SendHeartBeat());
AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
Leader leader = new Leader(leaderActorContext);
leader.markFollowerActive(followerActor.path().toString());
+ Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
+
leader.handleMessage(leaderActor, new SendHeartBeat());
AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
}};
}
+
+ @Test
+ public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ leaderActorContext.setConfigParams(configParams);
+
+ ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+ MockRaftActorContext followerActorContext =
+ new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+
+ followerActorContext.setConfigParams(configParams);
+
+ Follower follower = new Follower(followerActorContext);
+
+ ForwardMessageToBehaviorActor.setBehavior(follower);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-reply",
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ //create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ Leader leader = new Leader(leaderActorContext);
+ leader.markFollowerActive("follower-reply");
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
+ .getFirstMatching(followerActor, AppendEntries.class);
+
+ assertNotNull(appendEntries);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply =
+ (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertNotNull(appendEntriesReply);
+
+ leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+ List<Object> entries = ForwardMessageToBehaviorActor
+ .getAllMatching(followerActor, AppendEntries.class);
+
+ assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+
+ AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+
+ assertEquals(1, appendEntriesSecond.getLeaderCommit());
+ assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
+ assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+
+ }};
+ }
+
class MockLeader extends Leader {
FollowerToSnapshot fts;