1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.Props;
8 import akka.testkit.JavaTestKit;
9 import com.google.protobuf.ByteString;
10 import java.io.ByteArrayOutputStream;
11 import java.io.IOException;
12 import java.io.ObjectOutputStream;
13 import java.util.ArrayList;
14 import java.util.Arrays;
15 import java.util.HashMap;
16 import java.util.List;
18 import org.junit.Test;
19 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
20 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
21 import org.opendaylight.controller.cluster.raft.RaftActorContext;
22 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
23 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
24 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
27 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
29 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
31 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
32 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
34 public class FollowerTest extends AbstractRaftActorBehaviorTest {
36 private final ActorRef followerActor = getSystem().actorOf(Props.create(
37 DoNothingActor.class));
40 @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
41 return new Follower(actorContext);
45 protected MockRaftActorContext createActorContext() {
46 return createActorContext(followerActor);
50 protected MockRaftActorContext createActorContext(ActorRef actorRef){
51 return new MockRaftActorContext("test", getSystem(), actorRef);
55 public void testThatAnElectionTimeoutIsTriggered(){
56 new JavaTestKit(getSystem()) {{
58 new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
60 protected void run() {
62 Follower follower = new Follower(createActorContext(getTestActor()));
64 final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
65 // do not put code outside this method, will run afterwards
67 protected Boolean match(Object in) {
68 if (in instanceof ElectionTimeout) {
76 assertEquals(true, out);
83 public void testHandleElectionTimeout(){
84 RaftActorContext raftActorContext = createActorContext();
86 new Follower(raftActorContext);
88 RaftActorBehavior raftBehavior =
89 follower.handleMessage(followerActor, new ElectionTimeout());
91 assertTrue(raftBehavior instanceof Candidate);
95 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
96 new JavaTestKit(getSystem()) {{
98 new Within(duration("1 seconds")) {
100 protected void run() {
102 RaftActorContext context = createActorContext(getTestActor());
104 context.getTermInformation().update(1000, null);
106 RaftActorBehavior follower = createBehavior(context);
108 follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
110 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
111 // do not put code outside this method, will run afterwards
113 protected Boolean match(Object in) {
114 if (in instanceof RequestVoteReply) {
115 RequestVoteReply reply = (RequestVoteReply) in;
116 return reply.isVoteGranted();
123 assertEquals(true, out);
130 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
131 new JavaTestKit(getSystem()) {{
133 new Within(duration("1 seconds")) {
135 protected void run() {
137 RaftActorContext context = createActorContext(getTestActor());
139 context.getTermInformation().update(1000, "test");
141 RaftActorBehavior follower = createBehavior(context);
143 follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
145 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
146 // do not put code outside this method, will run afterwards
148 protected Boolean match(Object in) {
149 if (in instanceof RequestVoteReply) {
150 RequestVoteReply reply = (RequestVoteReply) in;
151 return reply.isVoteGranted();
158 assertEquals(false, out);
165 * This test verifies that when an AppendEntries RPC is received by a RaftActor
166 * with a commitIndex that is greater than what has been applied to the
167 * state machine of the RaftActor, the RaftActor applies the state and
168 * sets it current applied state to the commitIndex of the sender.
173 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
174 new JavaTestKit(getSystem()) {{
176 RaftActorContext context =
177 createActorContext();
179 context.setLastApplied(100);
180 setLastLogEntry((MockRaftActorContext) context, 1, 100,
181 new MockRaftActorContext.MockPayload(""));
182 ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
184 List<ReplicatedLogEntry> entries =
186 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
187 new MockRaftActorContext.MockPayload("foo"))
190 // The new commitIndex is 101
191 AppendEntries appendEntries =
192 new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
194 RaftActorBehavior raftBehavior =
195 createBehavior(context).handleMessage(getRef(), appendEntries);
197 assertEquals(101L, context.getLastApplied());
203 * This test verifies that when an AppendEntries is received a specific prevLogTerm
204 * which does not match the term that is in RaftActors log entry at prevLogIndex
205 * then the RaftActor does not change it's state and it returns a failure.
210 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
212 new JavaTestKit(getSystem()) {{
214 MockRaftActorContext context = createActorContext();
216 // First set the receivers term to lower number
217 context.getTermInformation().update(95, "test");
219 // Set the last log entry term for the receiver to be greater than
220 // what we will be sending as the prevLogTerm in AppendEntries
221 MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
222 setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
224 // AppendEntries is now sent with a bigger term
225 // this will set the receivers term to be the same as the sender's term
226 AppendEntries appendEntries =
227 new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
229 RaftActorBehavior behavior = createBehavior(context);
231 // Send an unknown message so that the state of the RaftActor remains unchanged
232 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
234 RaftActorBehavior raftBehavior =
235 behavior.handleMessage(getRef(), appendEntries);
237 assertEquals(expected, raftBehavior);
239 // Also expect an AppendEntriesReply to be sent where success is false
240 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
241 "AppendEntriesReply") {
242 // do not put code outside this method, will run afterwards
244 protected Boolean match(Object in) {
245 if (in instanceof AppendEntriesReply) {
246 AppendEntriesReply reply = (AppendEntriesReply) in;
247 return reply.isSuccess();
254 assertEquals(false, out);
263 * This test verifies that when a new AppendEntries message is received with
264 * new entries and the logs of the sender and receiver match that the new
265 * entries get added to the log and the log is incremented by the number of
266 * entries received in appendEntries
271 public void testHandleAppendEntriesAddNewEntries() throws Exception {
272 new JavaTestKit(getSystem()) {{
274 MockRaftActorContext context = createActorContext();
276 // First set the receivers term to lower number
277 context.getTermInformation().update(1, "test");
279 // Prepare the receivers log
280 MockRaftActorContext.SimpleReplicatedLog log =
281 new MockRaftActorContext.SimpleReplicatedLog();
283 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
285 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
287 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
289 context.setReplicatedLog(log);
291 // Prepare the entries to be sent with AppendEntries
292 List<ReplicatedLogEntry> entries = new ArrayList<>();
294 new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
296 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
298 // Send appendEntries with the same term as was set on the receiver
299 // before the new behavior was created (1 in this case)
300 // This will not work for a Candidate because as soon as a Candidate
301 // is created it increments the term
302 AppendEntries appendEntries =
303 new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
305 RaftActorBehavior behavior = createBehavior(context);
307 // Send an unknown message so that the state of the RaftActor remains unchanged
308 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
310 RaftActorBehavior raftBehavior =
311 behavior.handleMessage(getRef(), appendEntries);
313 assertEquals(expected, raftBehavior);
314 assertEquals(5, log.last().getIndex() + 1);
315 assertNotNull(log.get(3));
316 assertNotNull(log.get(4));
318 // Also expect an AppendEntriesReply to be sent where success is false
319 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
320 "AppendEntriesReply") {
321 // do not put code outside this method, will run afterwards
323 protected Boolean match(Object in) {
324 if (in instanceof AppendEntriesReply) {
325 AppendEntriesReply reply = (AppendEntriesReply) in;
326 return reply.isSuccess();
333 assertEquals(true, out);
342 * This test verifies that when a new AppendEntries message is received with
343 * new entries and the logs of the sender and receiver are out-of-sync that
344 * the log is first corrected by removing the out of sync entries from the
345 * log and then adding in the new entries sent with the AppendEntries message
350 public void testHandleAppendEntriesCorrectReceiverLogEntries()
352 new JavaTestKit(getSystem()) {{
354 MockRaftActorContext context = createActorContext();
356 // First set the receivers term to lower number
357 context.getTermInformation().update(2, "test");
359 // Prepare the receivers log
360 MockRaftActorContext.SimpleReplicatedLog log =
361 new MockRaftActorContext.SimpleReplicatedLog();
363 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
365 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
367 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
369 context.setReplicatedLog(log);
371 // Prepare the entries to be sent with AppendEntries
372 List<ReplicatedLogEntry> entries = new ArrayList<>();
374 new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
376 new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
378 // Send appendEntries with the same term as was set on the receiver
379 // before the new behavior was created (1 in this case)
380 // This will not work for a Candidate because as soon as a Candidate
381 // is created it increments the term
382 AppendEntries appendEntries =
383 new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1);
385 RaftActorBehavior behavior = createBehavior(context);
387 // Send an unknown message so that the state of the RaftActor remains unchanged
388 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
390 RaftActorBehavior raftBehavior =
391 behavior.handleMessage(getRef(), appendEntries);
393 assertEquals(expected, raftBehavior);
395 // The entry at index 2 will be found out-of-sync with the leader
396 // and will be removed
397 // Then the two new entries will be added to the log
398 // Thus making the log to have 4 entries
399 assertEquals(4, log.last().getIndex() + 1);
400 assertNotNull(log.get(2));
402 assertEquals("one", log.get(1).getData().toString());
404 // Check that the entry at index 2 has the new data
405 assertEquals("two-1", log.get(2).getData().toString());
407 assertEquals("three", log.get(3).getData().toString());
409 assertNotNull(log.get(3));
411 // Also expect an AppendEntriesReply to be sent where success is false
412 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
413 "AppendEntriesReply") {
414 // do not put code outside this method, will run afterwards
416 protected Boolean match(Object in) {
417 if (in instanceof AppendEntriesReply) {
418 AppendEntriesReply reply = (AppendEntriesReply) in;
419 return reply.isSuccess();
426 assertEquals(true, out);
433 public void testHandleAppendEntriesPreviousLogEntryMissing(){
434 new JavaTestKit(getSystem()) {{
436 MockRaftActorContext context = createActorContext();
438 // Prepare the receivers log
439 MockRaftActorContext.SimpleReplicatedLog log =
440 new MockRaftActorContext.SimpleReplicatedLog();
442 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
444 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
446 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
448 context.setReplicatedLog(log);
450 // Prepare the entries to be sent with AppendEntries
451 List<ReplicatedLogEntry> entries = new ArrayList<>();
453 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
455 AppendEntries appendEntries =
456 new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1);
458 RaftActorBehavior behavior = createBehavior(context);
460 // Send an unknown message so that the state of the RaftActor remains unchanged
461 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
463 RaftActorBehavior raftBehavior =
464 behavior.handleMessage(getRef(), appendEntries);
466 assertEquals(expected, raftBehavior);
468 // Also expect an AppendEntriesReply to be sent where success is false
469 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
470 "AppendEntriesReply") {
471 // do not put code outside this method, will run afterwards
473 protected Boolean match(Object in) {
474 if (in instanceof AppendEntriesReply) {
475 AppendEntriesReply reply = (AppendEntriesReply) in;
476 return reply.isSuccess();
483 assertEquals(false, out);
490 public void testHandleAppendAfterInstallingSnapshot(){
491 new JavaTestKit(getSystem()) {{
493 MockRaftActorContext context = createActorContext();
496 // Prepare the receivers log
497 MockRaftActorContext.SimpleReplicatedLog log =
498 new MockRaftActorContext.SimpleReplicatedLog();
500 // Set up a log as if it has been snapshotted
501 log.setSnapshotIndex(3);
502 log.setSnapshotTerm(1);
504 context.setReplicatedLog(log);
506 // Prepare the entries to be sent with AppendEntries
507 List<ReplicatedLogEntry> entries = new ArrayList<>();
509 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
511 AppendEntries appendEntries =
512 new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3);
514 RaftActorBehavior behavior = createBehavior(context);
516 // Send an unknown message so that the state of the RaftActor remains unchanged
517 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
519 RaftActorBehavior raftBehavior =
520 behavior.handleMessage(getRef(), appendEntries);
522 assertEquals(expected, raftBehavior);
524 // Also expect an AppendEntriesReply to be sent where success is false
525 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
526 "AppendEntriesReply") {
527 // do not put code outside this method, will run afterwards
529 protected Boolean match(Object in) {
530 if (in instanceof AppendEntriesReply) {
531 AppendEntriesReply reply = (AppendEntriesReply) in;
532 return reply.isSuccess();
539 assertEquals(true, out);
547 * This test verifies that when InstallSnapshot is received by
548 * the follower its applied correctly.
553 public void testHandleInstallSnapshot() throws Exception {
554 JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
556 ActorRef leaderActor = getSystem().actorOf(Props.create(
557 MessageCollectorActor.class));
559 MockRaftActorContext context = createActorContext(getRef());
561 Follower follower = (Follower)createBehavior(context);
563 HashMap<String, String> followerSnapshot = new HashMap<>();
564 followerSnapshot.put("1", "A");
565 followerSnapshot.put("2", "B");
566 followerSnapshot.put("3", "C");
568 ByteString bsSnapshot = toByteString(followerSnapshot);
569 ByteString chunkData = ByteString.EMPTY;
571 int snapshotLength = bsSnapshot.size();
576 chunkData = getNextChunk(bsSnapshot, offset);
577 final InstallSnapshot installSnapshot =
578 new InstallSnapshot(1, "leader-1", i, 1,
579 chunkData, chunkIndex, 3);
580 follower.handleMessage(leaderActor, installSnapshot);
581 offset = offset + 50;
584 } while ((offset+50) < snapshotLength);
586 final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
587 follower.handleMessage(leaderActor, installSnapshot3);
589 String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
591 protected String match(Object o) throws Exception {
592 if (o instanceof ApplySnapshot) {
593 ApplySnapshot as = (ApplySnapshot)o;
594 if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
595 return "applySnapshot-lastIndex-mismatch";
597 if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
598 return "applySnapshot-lastAppliedTerm-mismatch";
600 if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
601 return "applySnapshot-lastAppliedIndex-mismatch";
603 if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
604 return "applySnapshot-lastTerm-mismatch";
606 return "applySnapshot";
613 // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
614 assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
616 String applySnapshotMatch = "";
617 for (String reply: matches) {
618 if (reply.startsWith("applySnapshot")) {
619 applySnapshotMatch = reply;
623 assertEquals("applySnapshot", applySnapshotMatch);
625 Object messages = executeLocalOperation(leaderActor, "get-all-messages");
627 assertNotNull(messages);
628 assertTrue(messages instanceof List);
629 List<Object> listMessages = (List<Object>) messages;
631 int installSnapshotReplyReceivedCount = 0;
632 for (Object message: listMessages) {
633 if (message instanceof InstallSnapshotReply) {
634 ++installSnapshotReplyReceivedCount;
638 assertEquals(3, installSnapshotReplyReceivedCount);
644 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
645 JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
648 ActorRef leaderActor = getSystem().actorOf(Props.create(
649 MessageCollectorActor.class));
651 MockRaftActorContext context = createActorContext(getRef());
653 Follower follower = (Follower) createBehavior(context);
655 HashMap<String, String> followerSnapshot = new HashMap<>();
656 followerSnapshot.put("1", "A");
657 followerSnapshot.put("2", "B");
658 followerSnapshot.put("3", "C");
660 ByteString bsSnapshot = toByteString(followerSnapshot);
662 final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
663 follower.handleMessage(leaderActor, installSnapshot);
665 Object messages = executeLocalOperation(leaderActor, "get-all-messages");
667 assertNotNull(messages);
668 assertTrue(messages instanceof List);
669 List<Object> listMessages = (List<Object>) messages;
671 int installSnapshotReplyReceivedCount = 0;
672 for (Object message: listMessages) {
673 if (message instanceof InstallSnapshotReply) {
674 ++installSnapshotReplyReceivedCount;
678 assertEquals(1, installSnapshotReplyReceivedCount);
679 InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
680 assertEquals(false, reply.isSuccess());
681 assertEquals(-1, reply.getChunkIndex());
682 assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
688 public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
689 return MessageCollectorActor.getAllMessages(actor);
692 public ByteString getNextChunk (ByteString bs, int offset){
693 int snapshotLength = bs.size();
696 if (50 > snapshotLength) {
697 size = snapshotLength;
699 if ((start + 50) > snapshotLength) {
700 size = snapshotLength - start;
703 return bs.substring(start, start + size);
706 private ByteString toByteString(Map<String, String> state) {
707 ByteArrayOutputStream b = null;
708 ObjectOutputStream o = null;
711 b = new ByteArrayOutputStream();
712 o = new ObjectOutputStream(b);
713 o.writeObject(state);
714 byte[] snapshotBytes = b.toByteArray();
715 return ByteString.copyFrom(snapshotBytes);
725 } catch (IOException e) {
726 org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);