1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.testkit.JavaTestKit;
6 import com.google.protobuf.ByteString;
7 import junit.framework.Assert;
9 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
10 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
11 import org.opendaylight.controller.cluster.raft.RaftActorContext;
12 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
13 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
14 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
15 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
16 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
17 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
18 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
19 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
20 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
21 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
22 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.ObjectOutputStream;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.HashMap;
30 import java.util.List;
33 import static org.junit.Assert.assertEquals;
34 import static org.junit.Assert.assertNotNull;
35 import static org.junit.Assert.assertTrue;
37 public class FollowerTest extends AbstractRaftActorBehaviorTest {
39 private final ActorRef followerActor = getSystem().actorOf(Props.create(
40 DoNothingActor.class));
43 @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
44 return new Follower(actorContext);
47 @Override protected RaftActorContext createActorContext() {
48 return createActorContext(followerActor);
51 protected RaftActorContext createActorContext(ActorRef actorRef){
52 return new MockRaftActorContext("test", getSystem(), actorRef);
56 public void testThatAnElectionTimeoutIsTriggered(){
57 new JavaTestKit(getSystem()) {{
59 new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
60 protected void run() {
62 Follower follower = new Follower(createActorContext(getTestActor()));
64 final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
65 // do not put code outside this method, will run afterwards
66 protected Boolean match(Object in) {
67 if (in instanceof ElectionTimeout) {
75 assertEquals(true, out);
82 public void testHandleElectionTimeout(){
83 RaftActorContext raftActorContext = createActorContext();
85 new Follower(raftActorContext);
87 RaftActorBehavior raftBehavior =
88 follower.handleMessage(followerActor, new ElectionTimeout());
90 Assert.assertTrue(raftBehavior instanceof Candidate);
94 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
95 new JavaTestKit(getSystem()) {{
97 new Within(duration("1 seconds")) {
98 protected void run() {
100 RaftActorContext context = createActorContext(getTestActor());
102 context.getTermInformation().update(1000, null);
104 RaftActorBehavior follower = createBehavior(context);
106 follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
108 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
109 // do not put code outside this method, will run afterwards
110 protected Boolean match(Object in) {
111 if (in instanceof RequestVoteReply) {
112 RequestVoteReply reply = (RequestVoteReply) in;
113 return reply.isVoteGranted();
120 assertEquals(true, out);
127 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
128 new JavaTestKit(getSystem()) {{
130 new Within(duration("1 seconds")) {
131 protected void run() {
133 RaftActorContext context = createActorContext(getTestActor());
135 context.getTermInformation().update(1000, "test");
137 RaftActorBehavior follower = createBehavior(context);
139 follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
141 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
142 // do not put code outside this method, will run afterwards
143 protected Boolean match(Object in) {
144 if (in instanceof RequestVoteReply) {
145 RequestVoteReply reply = (RequestVoteReply) in;
146 return reply.isVoteGranted();
153 assertEquals(false, out);
160 * This test verifies that when an AppendEntries RPC is received by a RaftActor
161 * with a commitIndex that is greater than what has been applied to the
162 * state machine of the RaftActor, the RaftActor applies the state and
163 * sets it current applied state to the commitIndex of the sender.
168 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
169 new JavaTestKit(getSystem()) {{
171 RaftActorContext context =
172 createActorContext();
174 context.setLastApplied(100);
175 setLastLogEntry((MockRaftActorContext) context, 1, 100,
176 new MockRaftActorContext.MockPayload(""));
177 ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
179 List<ReplicatedLogEntry> entries =
181 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
182 new MockRaftActorContext.MockPayload("foo"))
185 // The new commitIndex is 101
186 AppendEntries appendEntries =
187 new AppendEntries(2, "leader-1", 100, 1, entries, 101);
189 RaftActorBehavior raftBehavior =
190 createBehavior(context).handleMessage(getRef(), appendEntries);
192 assertEquals(101L, context.getLastApplied());
198 * This test verifies that when an AppendEntries is received a specific prevLogTerm
199 * which does not match the term that is in RaftActors log entry at prevLogIndex
200 * then the RaftActor does not change it's state and it returns a failure.
205 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
207 new JavaTestKit(getSystem()) {{
209 MockRaftActorContext context = (MockRaftActorContext)
210 createActorContext();
212 // First set the receivers term to lower number
213 context.getTermInformation().update(95, "test");
215 // Set the last log entry term for the receiver to be greater than
216 // what we will be sending as the prevLogTerm in AppendEntries
217 MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
218 setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
220 // AppendEntries is now sent with a bigger term
221 // this will set the receivers term to be the same as the sender's term
222 AppendEntries appendEntries =
223 new AppendEntries(100, "leader-1", 0, 0, null, 101);
225 RaftActorBehavior behavior = createBehavior(context);
227 // Send an unknown message so that the state of the RaftActor remains unchanged
228 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
230 RaftActorBehavior raftBehavior =
231 behavior.handleMessage(getRef(), appendEntries);
233 assertEquals(expected, raftBehavior);
235 // Also expect an AppendEntriesReply to be sent where success is false
236 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
237 "AppendEntriesReply") {
238 // do not put code outside this method, will run afterwards
239 protected Boolean match(Object in) {
240 if (in instanceof AppendEntriesReply) {
241 AppendEntriesReply reply = (AppendEntriesReply) in;
242 return reply.isSuccess();
249 assertEquals(false, out);
258 * This test verifies that when a new AppendEntries message is received with
259 * new entries and the logs of the sender and receiver match that the new
260 * entries get added to the log and the log is incremented by the number of
261 * entries received in appendEntries
266 public void testHandleAppendEntriesAddNewEntries() throws Exception {
267 new JavaTestKit(getSystem()) {{
269 MockRaftActorContext context = (MockRaftActorContext)
270 createActorContext();
272 // First set the receivers term to lower number
273 context.getTermInformation().update(1, "test");
275 // Prepare the receivers log
276 MockRaftActorContext.SimpleReplicatedLog log =
277 new MockRaftActorContext.SimpleReplicatedLog();
279 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
281 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
283 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
285 context.setReplicatedLog(log);
287 // Prepare the entries to be sent with AppendEntries
288 List<ReplicatedLogEntry> entries = new ArrayList<>();
290 new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
292 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
294 // Send appendEntries with the same term as was set on the receiver
295 // before the new behavior was created (1 in this case)
296 // This will not work for a Candidate because as soon as a Candidate
297 // is created it increments the term
298 AppendEntries appendEntries =
299 new AppendEntries(1, "leader-1", 2, 1, entries, 4);
301 RaftActorBehavior behavior = createBehavior(context);
303 // Send an unknown message so that the state of the RaftActor remains unchanged
304 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
306 RaftActorBehavior raftBehavior =
307 behavior.handleMessage(getRef(), appendEntries);
309 assertEquals(expected, raftBehavior);
310 assertEquals(5, log.last().getIndex() + 1);
311 assertNotNull(log.get(3));
312 assertNotNull(log.get(4));
314 // Also expect an AppendEntriesReply to be sent where success is false
315 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
316 "AppendEntriesReply") {
317 // do not put code outside this method, will run afterwards
318 protected Boolean match(Object in) {
319 if (in instanceof AppendEntriesReply) {
320 AppendEntriesReply reply = (AppendEntriesReply) in;
321 return reply.isSuccess();
328 assertEquals(true, out);
337 * This test verifies that when a new AppendEntries message is received with
338 * new entries and the logs of the sender and receiver are out-of-sync that
339 * the log is first corrected by removing the out of sync entries from the
340 * log and then adding in the new entries sent with the AppendEntries message
345 public void testHandleAppendEntriesCorrectReceiverLogEntries()
347 new JavaTestKit(getSystem()) {{
349 MockRaftActorContext context = (MockRaftActorContext)
350 createActorContext();
352 // First set the receivers term to lower number
353 context.getTermInformation().update(2, "test");
355 // Prepare the receivers log
356 MockRaftActorContext.SimpleReplicatedLog log =
357 new MockRaftActorContext.SimpleReplicatedLog();
359 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
361 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
363 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
365 context.setReplicatedLog(log);
367 // Prepare the entries to be sent with AppendEntries
368 List<ReplicatedLogEntry> entries = new ArrayList<>();
370 new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
372 new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
374 // Send appendEntries with the same term as was set on the receiver
375 // before the new behavior was created (1 in this case)
376 // This will not work for a Candidate because as soon as a Candidate
377 // is created it increments the term
378 AppendEntries appendEntries =
379 new AppendEntries(2, "leader-1", 1, 1, entries, 3);
381 RaftActorBehavior behavior = createBehavior(context);
383 // Send an unknown message so that the state of the RaftActor remains unchanged
384 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
386 RaftActorBehavior raftBehavior =
387 behavior.handleMessage(getRef(), appendEntries);
389 assertEquals(expected, raftBehavior);
391 // The entry at index 2 will be found out-of-sync with the leader
392 // and will be removed
393 // Then the two new entries will be added to the log
394 // Thus making the log to have 4 entries
395 assertEquals(4, log.last().getIndex() + 1);
396 assertNotNull(log.get(2));
398 assertEquals("one", log.get(1).getData().toString());
400 // Check that the entry at index 2 has the new data
401 assertEquals("two-1", log.get(2).getData().toString());
403 assertEquals("three", log.get(3).getData().toString());
405 assertNotNull(log.get(3));
407 // Also expect an AppendEntriesReply to be sent where success is false
408 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
409 "AppendEntriesReply") {
410 // do not put code outside this method, will run afterwards
411 protected Boolean match(Object in) {
412 if (in instanceof AppendEntriesReply) {
413 AppendEntriesReply reply = (AppendEntriesReply) in;
414 return reply.isSuccess();
421 assertEquals(true, out);
429 * This test verifies that when InstallSnapshot is received by
430 * the follower its applied correctly.
435 public void testHandleInstallSnapshot() throws Exception {
436 JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
438 ActorRef leaderActor = getSystem().actorOf(Props.create(
439 MessageCollectorActor.class));
441 MockRaftActorContext context = (MockRaftActorContext)
442 createActorContext(getRef());
444 Follower follower = (Follower)createBehavior(context);
446 HashMap<String, String> followerSnapshot = new HashMap<>();
447 followerSnapshot.put("1", "A");
448 followerSnapshot.put("2", "B");
449 followerSnapshot.put("3", "C");
451 ByteString bsSnapshot = toByteString(followerSnapshot);
452 ByteString chunkData = ByteString.EMPTY;
454 int snapshotLength = bsSnapshot.size();
458 chunkData = getNextChunk(bsSnapshot, offset);
459 final InstallSnapshot installSnapshot =
460 new InstallSnapshot(1, "leader-1", i, 1,
462 follower.handleMessage(leaderActor, installSnapshot);
463 offset = offset + 50;
465 } while ((offset+50) < snapshotLength);
467 final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
468 follower.handleMessage(leaderActor, installSnapshot3);
470 String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
472 protected String match(Object o) throws Exception {
473 if (o instanceof ApplySnapshot) {
474 ApplySnapshot as = (ApplySnapshot)o;
475 if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
476 return "applySnapshot-lastIndex-mismatch";
478 if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
479 return "applySnapshot-lastAppliedTerm-mismatch";
481 if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
482 return "applySnapshot-lastAppliedIndex-mismatch";
484 if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
485 return "applySnapshot-lastTerm-mismatch";
487 return "applySnapshot";
494 String applySnapshotMatch = "";
495 for (String reply: matches) {
496 if (reply.startsWith("applySnapshot")) {
497 applySnapshotMatch = reply;
501 assertEquals("applySnapshot", applySnapshotMatch);
503 Object messages = executeLocalOperation(leaderActor, "get-all-messages");
505 assertNotNull(messages);
506 assertTrue(messages instanceof List);
507 List<Object> listMessages = (List<Object>) messages;
509 int installSnapshotReplyReceivedCount = 0;
510 for (Object message: listMessages) {
511 if (message instanceof InstallSnapshotReply) {
512 ++installSnapshotReplyReceivedCount;
516 assertEquals(3, installSnapshotReplyReceivedCount);
521 public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
522 return MessageCollectorActor.getAllMessages(actor);
525 public ByteString getNextChunk (ByteString bs, int offset){
526 int snapshotLength = bs.size();
529 if (50 > snapshotLength) {
530 size = snapshotLength;
532 if ((start + 50) > snapshotLength) {
533 size = snapshotLength - start;
536 return bs.substring(start, start + size);
539 private ByteString toByteString(Map<String, String> state) {
540 ByteArrayOutputStream b = null;
541 ObjectOutputStream o = null;
544 b = new ByteArrayOutputStream();
545 o = new ObjectOutputStream(b);
546 o.writeObject(state);
547 byte[] snapshotBytes = b.toByteArray();
548 return ByteString.copyFrom(snapshotBytes);
558 } catch (IOException e) {
559 org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);