1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.Props;
8 import akka.testkit.JavaTestKit;
9 import com.google.protobuf.ByteString;
10 import java.io.ByteArrayOutputStream;
11 import java.io.IOException;
12 import java.io.ObjectOutputStream;
13 import java.util.ArrayList;
14 import java.util.Arrays;
15 import java.util.HashMap;
16 import java.util.List;
18 import org.junit.Test;
19 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
20 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
21 import org.opendaylight.controller.cluster.raft.RaftActorContext;
22 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
23 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
24 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
27 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
29 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
31 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
32 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
34 public class FollowerTest extends AbstractRaftActorBehaviorTest {
36 private final ActorRef followerActor = getSystem().actorOf(Props.create(
37 DoNothingActor.class));
40 @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
41 return new Follower(actorContext);
44 @Override protected RaftActorContext createActorContext() {
45 return createActorContext(followerActor);
48 protected RaftActorContext createActorContext(ActorRef actorRef){
49 return new MockRaftActorContext("test", getSystem(), actorRef);
53 public void testThatAnElectionTimeoutIsTriggered(){
54 new JavaTestKit(getSystem()) {{
56 new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
57 protected void run() {
59 Follower follower = new Follower(createActorContext(getTestActor()));
61 final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
62 // do not put code outside this method, will run afterwards
63 protected Boolean match(Object in) {
64 if (in instanceof ElectionTimeout) {
72 assertEquals(true, out);
79 public void testHandleElectionTimeout(){
80 RaftActorContext raftActorContext = createActorContext();
82 new Follower(raftActorContext);
84 RaftActorBehavior raftBehavior =
85 follower.handleMessage(followerActor, new ElectionTimeout());
87 assertTrue(raftBehavior instanceof Candidate);
91 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
92 new JavaTestKit(getSystem()) {{
94 new Within(duration("1 seconds")) {
95 protected void run() {
97 RaftActorContext context = createActorContext(getTestActor());
99 context.getTermInformation().update(1000, null);
101 RaftActorBehavior follower = createBehavior(context);
103 follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
105 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
106 // do not put code outside this method, will run afterwards
107 protected Boolean match(Object in) {
108 if (in instanceof RequestVoteReply) {
109 RequestVoteReply reply = (RequestVoteReply) in;
110 return reply.isVoteGranted();
117 assertEquals(true, out);
124 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
125 new JavaTestKit(getSystem()) {{
127 new Within(duration("1 seconds")) {
128 protected void run() {
130 RaftActorContext context = createActorContext(getTestActor());
132 context.getTermInformation().update(1000, "test");
134 RaftActorBehavior follower = createBehavior(context);
136 follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
138 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
139 // do not put code outside this method, will run afterwards
140 protected Boolean match(Object in) {
141 if (in instanceof RequestVoteReply) {
142 RequestVoteReply reply = (RequestVoteReply) in;
143 return reply.isVoteGranted();
150 assertEquals(false, out);
157 * This test verifies that when an AppendEntries RPC is received by a RaftActor
158 * with a commitIndex that is greater than what has been applied to the
159 * state machine of the RaftActor, the RaftActor applies the state and
160 * sets it current applied state to the commitIndex of the sender.
165 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
166 new JavaTestKit(getSystem()) {{
168 RaftActorContext context =
169 createActorContext();
171 context.setLastApplied(100);
172 setLastLogEntry((MockRaftActorContext) context, 1, 100,
173 new MockRaftActorContext.MockPayload(""));
174 ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
176 List<ReplicatedLogEntry> entries =
178 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
179 new MockRaftActorContext.MockPayload("foo"))
182 // The new commitIndex is 101
183 AppendEntries appendEntries =
184 new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
186 RaftActorBehavior raftBehavior =
187 createBehavior(context).handleMessage(getRef(), appendEntries);
189 assertEquals(101L, context.getLastApplied());
195 * This test verifies that when an AppendEntries is received a specific prevLogTerm
196 * which does not match the term that is in RaftActors log entry at prevLogIndex
197 * then the RaftActor does not change it's state and it returns a failure.
202 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
204 new JavaTestKit(getSystem()) {{
206 MockRaftActorContext context = (MockRaftActorContext)
207 createActorContext();
209 // First set the receivers term to lower number
210 context.getTermInformation().update(95, "test");
212 // Set the last log entry term for the receiver to be greater than
213 // what we will be sending as the prevLogTerm in AppendEntries
214 MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
215 setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
217 // AppendEntries is now sent with a bigger term
218 // this will set the receivers term to be the same as the sender's term
219 AppendEntries appendEntries =
220 new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
222 RaftActorBehavior behavior = createBehavior(context);
224 // Send an unknown message so that the state of the RaftActor remains unchanged
225 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
227 RaftActorBehavior raftBehavior =
228 behavior.handleMessage(getRef(), appendEntries);
230 assertEquals(expected, raftBehavior);
232 // Also expect an AppendEntriesReply to be sent where success is false
233 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
234 "AppendEntriesReply") {
235 // do not put code outside this method, will run afterwards
236 protected Boolean match(Object in) {
237 if (in instanceof AppendEntriesReply) {
238 AppendEntriesReply reply = (AppendEntriesReply) in;
239 return reply.isSuccess();
246 assertEquals(false, out);
255 * This test verifies that when a new AppendEntries message is received with
256 * new entries and the logs of the sender and receiver match that the new
257 * entries get added to the log and the log is incremented by the number of
258 * entries received in appendEntries
263 public void testHandleAppendEntriesAddNewEntries() throws Exception {
264 new JavaTestKit(getSystem()) {{
266 MockRaftActorContext context = (MockRaftActorContext)
267 createActorContext();
269 // First set the receivers term to lower number
270 context.getTermInformation().update(1, "test");
272 // Prepare the receivers log
273 MockRaftActorContext.SimpleReplicatedLog log =
274 new MockRaftActorContext.SimpleReplicatedLog();
276 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
278 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
280 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
282 context.setReplicatedLog(log);
284 // Prepare the entries to be sent with AppendEntries
285 List<ReplicatedLogEntry> entries = new ArrayList<>();
287 new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
289 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
291 // Send appendEntries with the same term as was set on the receiver
292 // before the new behavior was created (1 in this case)
293 // This will not work for a Candidate because as soon as a Candidate
294 // is created it increments the term
295 AppendEntries appendEntries =
296 new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
298 RaftActorBehavior behavior = createBehavior(context);
300 // Send an unknown message so that the state of the RaftActor remains unchanged
301 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
303 RaftActorBehavior raftBehavior =
304 behavior.handleMessage(getRef(), appendEntries);
306 assertEquals(expected, raftBehavior);
307 assertEquals(5, log.last().getIndex() + 1);
308 assertNotNull(log.get(3));
309 assertNotNull(log.get(4));
311 // Also expect an AppendEntriesReply to be sent where success is false
312 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
313 "AppendEntriesReply") {
314 // do not put code outside this method, will run afterwards
315 protected Boolean match(Object in) {
316 if (in instanceof AppendEntriesReply) {
317 AppendEntriesReply reply = (AppendEntriesReply) in;
318 return reply.isSuccess();
325 assertEquals(true, out);
334 * This test verifies that when a new AppendEntries message is received with
335 * new entries and the logs of the sender and receiver are out-of-sync that
336 * the log is first corrected by removing the out of sync entries from the
337 * log and then adding in the new entries sent with the AppendEntries message
342 public void testHandleAppendEntriesCorrectReceiverLogEntries()
344 new JavaTestKit(getSystem()) {{
346 MockRaftActorContext context = (MockRaftActorContext)
347 createActorContext();
349 // First set the receivers term to lower number
350 context.getTermInformation().update(2, "test");
352 // Prepare the receivers log
353 MockRaftActorContext.SimpleReplicatedLog log =
354 new MockRaftActorContext.SimpleReplicatedLog();
356 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
358 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
360 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
362 context.setReplicatedLog(log);
364 // Prepare the entries to be sent with AppendEntries
365 List<ReplicatedLogEntry> entries = new ArrayList<>();
367 new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
369 new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
371 // Send appendEntries with the same term as was set on the receiver
372 // before the new behavior was created (1 in this case)
373 // This will not work for a Candidate because as soon as a Candidate
374 // is created it increments the term
375 AppendEntries appendEntries =
376 new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1);
378 RaftActorBehavior behavior = createBehavior(context);
380 // Send an unknown message so that the state of the RaftActor remains unchanged
381 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
383 RaftActorBehavior raftBehavior =
384 behavior.handleMessage(getRef(), appendEntries);
386 assertEquals(expected, raftBehavior);
388 // The entry at index 2 will be found out-of-sync with the leader
389 // and will be removed
390 // Then the two new entries will be added to the log
391 // Thus making the log to have 4 entries
392 assertEquals(4, log.last().getIndex() + 1);
393 assertNotNull(log.get(2));
395 assertEquals("one", log.get(1).getData().toString());
397 // Check that the entry at index 2 has the new data
398 assertEquals("two-1", log.get(2).getData().toString());
400 assertEquals("three", log.get(3).getData().toString());
402 assertNotNull(log.get(3));
404 // Also expect an AppendEntriesReply to be sent where success is false
405 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
406 "AppendEntriesReply") {
407 // do not put code outside this method, will run afterwards
408 protected Boolean match(Object in) {
409 if (in instanceof AppendEntriesReply) {
410 AppendEntriesReply reply = (AppendEntriesReply) in;
411 return reply.isSuccess();
418 assertEquals(true, out);
425 public void testHandleAppendEntriesPreviousLogEntryMissing(){
426 new JavaTestKit(getSystem()) {{
428 MockRaftActorContext context = (MockRaftActorContext)
429 createActorContext();
431 // Prepare the receivers log
432 MockRaftActorContext.SimpleReplicatedLog log =
433 new MockRaftActorContext.SimpleReplicatedLog();
435 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
437 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
439 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
441 context.setReplicatedLog(log);
443 // Prepare the entries to be sent with AppendEntries
444 List<ReplicatedLogEntry> entries = new ArrayList<>();
446 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
448 AppendEntries appendEntries =
449 new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1);
451 RaftActorBehavior behavior = createBehavior(context);
453 // Send an unknown message so that the state of the RaftActor remains unchanged
454 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
456 RaftActorBehavior raftBehavior =
457 behavior.handleMessage(getRef(), appendEntries);
459 assertEquals(expected, raftBehavior);
461 // Also expect an AppendEntriesReply to be sent where success is false
462 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
463 "AppendEntriesReply") {
464 // do not put code outside this method, will run afterwards
465 protected Boolean match(Object in) {
466 if (in instanceof AppendEntriesReply) {
467 AppendEntriesReply reply = (AppendEntriesReply) in;
468 return reply.isSuccess();
475 assertEquals(false, out);
482 public void testHandleAppendAfterInstallingSnapshot(){
483 new JavaTestKit(getSystem()) {{
485 MockRaftActorContext context = (MockRaftActorContext)
486 createActorContext();
489 // Prepare the receivers log
490 MockRaftActorContext.SimpleReplicatedLog log =
491 new MockRaftActorContext.SimpleReplicatedLog();
493 // Set up a log as if it has been snapshotted
494 log.setSnapshotIndex(3);
495 log.setSnapshotTerm(1);
497 context.setReplicatedLog(log);
499 // Prepare the entries to be sent with AppendEntries
500 List<ReplicatedLogEntry> entries = new ArrayList<>();
502 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
504 AppendEntries appendEntries =
505 new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3);
507 RaftActorBehavior behavior = createBehavior(context);
509 // Send an unknown message so that the state of the RaftActor remains unchanged
510 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
512 RaftActorBehavior raftBehavior =
513 behavior.handleMessage(getRef(), appendEntries);
515 assertEquals(expected, raftBehavior);
517 // Also expect an AppendEntriesReply to be sent where success is false
518 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
519 "AppendEntriesReply") {
520 // do not put code outside this method, will run afterwards
521 protected Boolean match(Object in) {
522 if (in instanceof AppendEntriesReply) {
523 AppendEntriesReply reply = (AppendEntriesReply) in;
524 return reply.isSuccess();
531 assertEquals(true, out);
539 * This test verifies that when InstallSnapshot is received by
540 * the follower its applied correctly.
545 public void testHandleInstallSnapshot() throws Exception {
546 JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
548 ActorRef leaderActor = getSystem().actorOf(Props.create(
549 MessageCollectorActor.class));
551 MockRaftActorContext context = (MockRaftActorContext)
552 createActorContext(getRef());
554 Follower follower = (Follower)createBehavior(context);
556 HashMap<String, String> followerSnapshot = new HashMap<>();
557 followerSnapshot.put("1", "A");
558 followerSnapshot.put("2", "B");
559 followerSnapshot.put("3", "C");
561 ByteString bsSnapshot = toByteString(followerSnapshot);
562 ByteString chunkData = ByteString.EMPTY;
564 int snapshotLength = bsSnapshot.size();
569 chunkData = getNextChunk(bsSnapshot, offset);
570 final InstallSnapshot installSnapshot =
571 new InstallSnapshot(1, "leader-1", i, 1,
572 chunkData, chunkIndex, 3);
573 follower.handleMessage(leaderActor, installSnapshot);
574 offset = offset + 50;
577 } while ((offset+50) < snapshotLength);
579 final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
580 follower.handleMessage(leaderActor, installSnapshot3);
582 String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
584 protected String match(Object o) throws Exception {
585 if (o instanceof ApplySnapshot) {
586 ApplySnapshot as = (ApplySnapshot)o;
587 if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
588 return "applySnapshot-lastIndex-mismatch";
590 if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
591 return "applySnapshot-lastAppliedTerm-mismatch";
593 if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
594 return "applySnapshot-lastAppliedIndex-mismatch";
596 if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
597 return "applySnapshot-lastTerm-mismatch";
599 return "applySnapshot";
606 // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
607 assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
609 String applySnapshotMatch = "";
610 for (String reply: matches) {
611 if (reply.startsWith("applySnapshot")) {
612 applySnapshotMatch = reply;
616 assertEquals("applySnapshot", applySnapshotMatch);
618 Object messages = executeLocalOperation(leaderActor, "get-all-messages");
620 assertNotNull(messages);
621 assertTrue(messages instanceof List);
622 List<Object> listMessages = (List<Object>) messages;
624 int installSnapshotReplyReceivedCount = 0;
625 for (Object message: listMessages) {
626 if (message instanceof InstallSnapshotReply) {
627 ++installSnapshotReplyReceivedCount;
631 assertEquals(3, installSnapshotReplyReceivedCount);
637 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
638 JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
641 ActorRef leaderActor = getSystem().actorOf(Props.create(
642 MessageCollectorActor.class));
644 MockRaftActorContext context = (MockRaftActorContext)
645 createActorContext(getRef());
647 Follower follower = (Follower) createBehavior(context);
649 HashMap<String, String> followerSnapshot = new HashMap<>();
650 followerSnapshot.put("1", "A");
651 followerSnapshot.put("2", "B");
652 followerSnapshot.put("3", "C");
654 ByteString bsSnapshot = toByteString(followerSnapshot);
656 final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
657 follower.handleMessage(leaderActor, installSnapshot);
659 Object messages = executeLocalOperation(leaderActor, "get-all-messages");
661 assertNotNull(messages);
662 assertTrue(messages instanceof List);
663 List<Object> listMessages = (List<Object>) messages;
665 int installSnapshotReplyReceivedCount = 0;
666 for (Object message: listMessages) {
667 if (message instanceof InstallSnapshotReply) {
668 ++installSnapshotReplyReceivedCount;
672 assertEquals(1, installSnapshotReplyReceivedCount);
673 InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
674 assertEquals(false, reply.isSuccess());
675 assertEquals(-1, reply.getChunkIndex());
676 assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
682 public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
683 return MessageCollectorActor.getAllMessages(actor);
686 public ByteString getNextChunk (ByteString bs, int offset){
687 int snapshotLength = bs.size();
690 if (50 > snapshotLength) {
691 size = snapshotLength;
693 if ((start + 50) > snapshotLength) {
694 size = snapshotLength - start;
697 return bs.substring(start, start + size);
700 private ByteString toByteString(Map<String, String> state) {
701 ByteArrayOutputStream b = null;
702 ObjectOutputStream o = null;
705 b = new ByteArrayOutputStream();
706 o = new ObjectOutputStream(b);
707 o.writeObject(state);
708 byte[] snapshotBytes = b.toByteArray();
709 return ByteString.copyFrom(snapshotBytes);
719 } catch (IOException e) {
720 org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);