1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertTrue;
5 import akka.actor.ActorRef;
6 import akka.actor.Props;
7 import akka.testkit.TestActorRef;
8 import com.google.protobuf.ByteString;
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.HashMap;
12 import java.util.List;
13 import org.junit.After;
14 import org.junit.Assert;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
17 import org.opendaylight.controller.cluster.raft.RaftActorContext;
18 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
19 import org.opendaylight.controller.cluster.raft.Snapshot;
20 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
21 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
22 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
23 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
24 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
25 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
26 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
27 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
28 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
29 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
31 public class FollowerTest extends AbstractRaftActorBehaviorTest {
33 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
34 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
36 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
37 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
39 private RaftActorBehavior follower;
43 public void tearDown() throws Exception {
44 if(follower != null) {
52 protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
53 return new Follower(actorContext);
57 protected MockRaftActorContext createActorContext() {
58 return createActorContext(followerActor);
62 protected MockRaftActorContext createActorContext(ActorRef actorRef){
63 return new MockRaftActorContext("follower", getSystem(), actorRef);
67 public void testThatAnElectionTimeoutIsTriggered(){
68 MockRaftActorContext actorContext = createActorContext();
69 follower = new Follower(actorContext);
71 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
72 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
76 public void testHandleElectionTimeout(){
77 logStart("testHandleElectionTimeout");
79 follower = new Follower(createActorContext());
81 RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
83 assertTrue(raftBehavior instanceof Candidate);
87 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
88 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
90 RaftActorContext context = createActorContext();
92 context.getTermInformation().update(term, null);
94 follower = createBehavior(context);
96 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
98 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
100 assertEquals("isVoteGranted", true, reply.isVoteGranted());
101 assertEquals("getTerm", term, reply.getTerm());
105 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
106 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
108 RaftActorContext context = createActorContext();
110 context.getTermInformation().update(term, "test");
112 follower = createBehavior(context);
114 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
116 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
118 assertEquals("isVoteGranted", false, reply.isVoteGranted());
122 * This test verifies that when an AppendEntries RPC is received by a RaftActor
123 * with a commitIndex that is greater than what has been applied to the
124 * state machine of the RaftActor, the RaftActor applies the state and
125 * sets it current applied state to the commitIndex of the sender.
130 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
131 logStart("testHandleAppendEntriesWithNewerCommitIndex");
133 MockRaftActorContext context = createActorContext();
135 context.setLastApplied(100);
136 setLastLogEntry(context, 1, 100,
137 new MockRaftActorContext.MockPayload(""));
138 context.getReplicatedLog().setSnapshotIndex(99);
140 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
141 newReplicatedLogEntry(2, 101, "foo"));
143 // The new commitIndex is 101
144 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
146 follower = createBehavior(context);
147 follower.handleMessage(leaderActor, appendEntries);
149 assertEquals("getLastApplied", 101L, context.getLastApplied());
153 * This test verifies that when an AppendEntries is received a specific prevLogTerm
154 * which does not match the term that is in RaftActors log entry at prevLogIndex
155 * then the RaftActor does not change it's state and it returns a failure.
160 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
161 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
163 MockRaftActorContext context = createActorContext();
165 // First set the receivers term to lower number
166 context.getTermInformation().update(95, "test");
168 // AppendEntries is now sent with a bigger term
169 // this will set the receivers term to be the same as the sender's term
170 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
172 follower = createBehavior(context);
174 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
176 Assert.assertSame(follower, newBehavior);
178 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
179 AppendEntriesReply.class);
181 assertEquals("isSuccess", false, reply.isSuccess());
185 * This test verifies that when a new AppendEntries message is received with
186 * new entries and the logs of the sender and receiver match that the new
187 * entries get added to the log and the log is incremented by the number of
188 * entries received in appendEntries
193 public void testHandleAppendEntriesAddNewEntries() {
194 logStart("testHandleAppendEntriesAddNewEntries");
196 MockRaftActorContext context = createActorContext();
198 // First set the receivers term to lower number
199 context.getTermInformation().update(1, "test");
201 // Prepare the receivers log
202 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
203 log.append(newReplicatedLogEntry(1, 0, "zero"));
204 log.append(newReplicatedLogEntry(1, 1, "one"));
205 log.append(newReplicatedLogEntry(1, 2, "two"));
207 context.setReplicatedLog(log);
209 // Prepare the entries to be sent with AppendEntries
210 List<ReplicatedLogEntry> entries = new ArrayList<>();
211 entries.add(newReplicatedLogEntry(1, 3, "three"));
212 entries.add(newReplicatedLogEntry(1, 4, "four"));
214 // Send appendEntries with the same term as was set on the receiver
215 // before the new behavior was created (1 in this case)
216 // This will not work for a Candidate because as soon as a Candidate
217 // is created it increments the term
218 AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
220 follower = createBehavior(context);
222 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
224 Assert.assertSame(follower, newBehavior);
226 assertEquals("Next index", 5, log.last().getIndex() + 1);
227 assertEquals("Entry 3", entries.get(0), log.get(3));
228 assertEquals("Entry 4", entries.get(1), log.get(4));
230 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
234 * This test verifies that when a new AppendEntries message is received with
235 * new entries and the logs of the sender and receiver are out-of-sync that
236 * the log is first corrected by removing the out of sync entries from the
237 * log and then adding in the new entries sent with the AppendEntries message
240 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
241 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
243 MockRaftActorContext context = createActorContext();
245 // First set the receivers term to lower number
246 context.getTermInformation().update(1, "test");
248 // Prepare the receivers log
249 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
250 log.append(newReplicatedLogEntry(1, 0, "zero"));
251 log.append(newReplicatedLogEntry(1, 1, "one"));
252 log.append(newReplicatedLogEntry(1, 2, "two"));
254 context.setReplicatedLog(log);
256 // Prepare the entries to be sent with AppendEntries
257 List<ReplicatedLogEntry> entries = new ArrayList<>();
258 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
259 entries.add(newReplicatedLogEntry(2, 3, "three"));
261 // Send appendEntries with the same term as was set on the receiver
262 // before the new behavior was created (1 in this case)
263 // This will not work for a Candidate because as soon as a Candidate
264 // is created it increments the term
265 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
267 follower = createBehavior(context);
269 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
271 Assert.assertSame(follower, newBehavior);
273 // The entry at index 2 will be found out-of-sync with the leader
274 // and will be removed
275 // Then the two new entries will be added to the log
276 // Thus making the log to have 4 entries
277 assertEquals("Next index", 4, log.last().getIndex() + 1);
278 //assertEquals("Entry 2", entries.get(0), log.get(2));
280 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
282 // Check that the entry at index 2 has the new data
283 assertEquals("Entry 2", entries.get(0), log.get(2));
285 assertEquals("Entry 3", entries.get(1), log.get(3));
287 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
291 public void testHandleAppendEntriesPreviousLogEntryMissing(){
292 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
294 MockRaftActorContext context = createActorContext();
296 // Prepare the receivers log
297 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
298 log.append(newReplicatedLogEntry(1, 0, "zero"));
299 log.append(newReplicatedLogEntry(1, 1, "one"));
300 log.append(newReplicatedLogEntry(1, 2, "two"));
302 context.setReplicatedLog(log);
304 // Prepare the entries to be sent with AppendEntries
305 List<ReplicatedLogEntry> entries = new ArrayList<>();
306 entries.add(newReplicatedLogEntry(1, 4, "four"));
308 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
310 follower = createBehavior(context);
312 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
314 Assert.assertSame(follower, newBehavior);
316 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
320 public void testHandleAppendEntriesWithExistingLogEntry() {
321 logStart("testHandleAppendEntriesWithExistingLogEntry");
323 MockRaftActorContext context = createActorContext();
325 context.getTermInformation().update(1, "test");
327 // Prepare the receivers log
328 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
329 log.append(newReplicatedLogEntry(1, 0, "zero"));
330 log.append(newReplicatedLogEntry(1, 1, "one"));
332 context.setReplicatedLog(log);
334 // Send the last entry again.
335 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
337 follower = createBehavior(context);
339 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
341 assertEquals("Next index", 2, log.last().getIndex() + 1);
342 assertEquals("Entry 1", entries.get(0), log.get(1));
344 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
346 // Send the last entry again and also a new one.
348 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
350 leaderActor.underlyingActor().clear();
351 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
353 assertEquals("Next index", 3, log.last().getIndex() + 1);
354 assertEquals("Entry 1", entries.get(0), log.get(1));
355 assertEquals("Entry 2", entries.get(1), log.get(2));
357 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
361 public void testHandleAppendAfterInstallingSnapshot(){
362 logStart("testHandleAppendAfterInstallingSnapshot");
364 MockRaftActorContext context = createActorContext();
366 // Prepare the receivers log
367 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
369 // Set up a log as if it has been snapshotted
370 log.setSnapshotIndex(3);
371 log.setSnapshotTerm(1);
373 context.setReplicatedLog(log);
375 // Prepare the entries to be sent with AppendEntries
376 List<ReplicatedLogEntry> entries = new ArrayList<>();
377 entries.add(newReplicatedLogEntry(1, 4, "four"));
379 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
381 follower = createBehavior(context);
383 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
385 Assert.assertSame(follower, newBehavior);
387 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
392 * This test verifies that when InstallSnapshot is received by
393 * the follower its applied correctly.
398 public void testHandleInstallSnapshot() throws Exception {
399 logStart("testHandleInstallSnapshot");
401 MockRaftActorContext context = createActorContext();
403 follower = createBehavior(context);
405 HashMap<String, String> followerSnapshot = new HashMap<>();
406 followerSnapshot.put("1", "A");
407 followerSnapshot.put("2", "B");
408 followerSnapshot.put("3", "C");
410 ByteString bsSnapshot = toByteString(followerSnapshot);
412 int snapshotLength = bsSnapshot.size();
414 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
415 int lastIncludedIndex = 1;
417 InstallSnapshot lastInstallSnapshot = null;
419 for(int i = 0; i < totalChunks; i++) {
420 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
421 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
422 chunkData, chunkIndex, totalChunks);
423 follower.handleMessage(leaderActor, lastInstallSnapshot);
424 offset = offset + 50;
429 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
430 ApplySnapshot.class);
431 Snapshot snapshot = applySnapshot.getSnapshot();
432 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
433 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
434 snapshot.getLastAppliedTerm());
435 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
436 snapshot.getLastAppliedIndex());
437 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
438 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
440 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
441 leaderActor, InstallSnapshotReply.class);
442 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
445 for(InstallSnapshotReply reply: replies) {
446 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
447 assertEquals("getTerm", 1, reply.getTerm());
448 assertEquals("isSuccess", true, reply.isSuccess());
449 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
452 Assert.assertNull("Expected null SnapshotTracker", ((Follower)follower).getSnapshotTracker());
456 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
457 logStart("testHandleOutOfSequenceInstallSnapshot");
459 MockRaftActorContext context = createActorContext();
461 follower = createBehavior(context);
463 HashMap<String, String> followerSnapshot = new HashMap<>();
464 followerSnapshot.put("1", "A");
465 followerSnapshot.put("2", "B");
466 followerSnapshot.put("3", "C");
468 ByteString bsSnapshot = toByteString(followerSnapshot);
470 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
471 getNextChunk(bsSnapshot, 10, 50), 3, 3);
472 follower.handleMessage(leaderActor, installSnapshot);
474 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
475 InstallSnapshotReply.class);
477 assertEquals("isSuccess", false, reply.isSuccess());
478 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
479 assertEquals("getTerm", 1, reply.getTerm());
480 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
482 Assert.assertNull("Expected null SnapshotTracker", ((Follower)follower).getSnapshotTracker());
485 public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
486 int snapshotLength = bs.size();
488 int size = chunkSize;
489 if (chunkSize > snapshotLength) {
490 size = snapshotLength;
492 if ((start + chunkSize) > snapshotLength) {
493 size = snapshotLength - start;
496 return bs.substring(start, start + size);
499 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
500 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
502 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
503 AppendEntriesReply.class);
505 assertEquals("isSuccess", expSuccess, reply.isSuccess());
506 assertEquals("getTerm", expTerm, reply.getTerm());
507 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
508 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
509 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
512 private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
513 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
514 new MockRaftActorContext.MockPayload(data));
518 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
519 ActorRef actorRef, RaftRPC rpc) throws Exception {
520 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
522 String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
523 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
527 protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
529 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
530 assertEquals("isSuccess", true, reply.isSuccess());