1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.testkit.JavaTestKit;
6 import akka.util.Timeout;
7 import com.google.protobuf.ByteString;
8 import junit.framework.Assert;
10 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
11 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
12 import org.opendaylight.controller.cluster.raft.RaftActorContext;
13 import org.opendaylight.controller.cluster.raft.RaftState;
14 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
15 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
16 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
17 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
18 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
19 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
20 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
21 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
22 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
23 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
24 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
25 import scala.concurrent.Await;
26 import scala.concurrent.Future;
27 import scala.concurrent.duration.Duration;
28 import scala.concurrent.duration.FiniteDuration;
30 import java.io.ByteArrayOutputStream;
31 import java.io.IOException;
32 import java.io.ObjectOutputStream;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.HashMap;
36 import java.util.List;
38 import java.util.concurrent.TimeUnit;
40 import static akka.pattern.Patterns.ask;
41 import static org.junit.Assert.assertEquals;
42 import static org.junit.Assert.assertNotNull;
43 import static org.junit.Assert.assertTrue;
45 public class FollowerTest extends AbstractRaftActorBehaviorTest {
47 private final ActorRef followerActor = getSystem().actorOf(Props.create(
48 DoNothingActor.class));
51 @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
52 return new Follower(actorContext);
55 @Override protected RaftActorContext createActorContext() {
56 return createActorContext(followerActor);
59 protected RaftActorContext createActorContext(ActorRef actorRef){
60 return new MockRaftActorContext("test", getSystem(), actorRef);
64 public void testThatAnElectionTimeoutIsTriggered(){
65 new JavaTestKit(getSystem()) {{
67 new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
68 protected void run() {
70 Follower follower = new Follower(createActorContext(getTestActor()));
72 final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
73 // do not put code outside this method, will run afterwards
74 protected Boolean match(Object in) {
75 if (in instanceof ElectionTimeout) {
83 assertEquals(true, out);
90 public void testHandleElectionTimeout(){
91 RaftActorContext raftActorContext = createActorContext();
93 new Follower(raftActorContext);
96 follower.handleMessage(followerActor, new ElectionTimeout());
98 Assert.assertEquals(RaftState.Candidate, raftState);
102 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
103 new JavaTestKit(getSystem()) {{
105 new Within(duration("1 seconds")) {
106 protected void run() {
108 RaftActorContext context = createActorContext(getTestActor());
110 context.getTermInformation().update(1000, null);
112 RaftActorBehavior follower = createBehavior(context);
114 follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
116 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
117 // do not put code outside this method, will run afterwards
118 protected Boolean match(Object in) {
119 if (in instanceof RequestVoteReply) {
120 RequestVoteReply reply = (RequestVoteReply) in;
121 return reply.isVoteGranted();
128 assertEquals(true, out);
135 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
136 new JavaTestKit(getSystem()) {{
138 new Within(duration("1 seconds")) {
139 protected void run() {
141 RaftActorContext context = createActorContext(getTestActor());
143 context.getTermInformation().update(1000, "test");
145 RaftActorBehavior follower = createBehavior(context);
147 follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
149 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
150 // do not put code outside this method, will run afterwards
151 protected Boolean match(Object in) {
152 if (in instanceof RequestVoteReply) {
153 RequestVoteReply reply = (RequestVoteReply) in;
154 return reply.isVoteGranted();
161 assertEquals(false, out);
168 * This test verifies that when an AppendEntries RPC is received by a RaftActor
169 * with a commitIndex that is greater than what has been applied to the
170 * state machine of the RaftActor, the RaftActor applies the state and
171 * sets it current applied state to the commitIndex of the sender.
176 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
177 new JavaTestKit(getSystem()) {{
179 RaftActorContext context =
180 createActorContext();
182 context.setLastApplied(100);
183 setLastLogEntry((MockRaftActorContext) context, 1, 100,
184 new MockRaftActorContext.MockPayload(""));
185 ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
187 List<ReplicatedLogEntry> entries =
189 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
190 new MockRaftActorContext.MockPayload("foo"))
193 // The new commitIndex is 101
194 AppendEntries appendEntries =
195 new AppendEntries(2, "leader-1", 100, 1, entries, 101);
197 RaftState raftState =
198 createBehavior(context).handleMessage(getRef(), appendEntries);
200 assertEquals(101L, context.getLastApplied());
206 * This test verifies that when an AppendEntries is received a specific prevLogTerm
207 * which does not match the term that is in RaftActors log entry at prevLogIndex
208 * then the RaftActor does not change it's state and it returns a failure.
213 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
215 new JavaTestKit(getSystem()) {{
217 MockRaftActorContext context = (MockRaftActorContext)
218 createActorContext();
220 // First set the receivers term to lower number
221 context.getTermInformation().update(95, "test");
223 // Set the last log entry term for the receiver to be greater than
224 // what we will be sending as the prevLogTerm in AppendEntries
225 MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
226 setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
228 // AppendEntries is now sent with a bigger term
229 // this will set the receivers term to be the same as the sender's term
230 AppendEntries appendEntries =
231 new AppendEntries(100, "leader-1", 0, 0, null, 101);
233 RaftActorBehavior behavior = createBehavior(context);
235 // Send an unknown message so that the state of the RaftActor remains unchanged
236 RaftState expected = behavior.handleMessage(getRef(), "unknown");
238 RaftState raftState =
239 behavior.handleMessage(getRef(), appendEntries);
241 assertEquals(expected, raftState);
243 // Also expect an AppendEntriesReply to be sent where success is false
244 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
245 "AppendEntriesReply") {
246 // do not put code outside this method, will run afterwards
247 protected Boolean match(Object in) {
248 if (in instanceof AppendEntriesReply) {
249 AppendEntriesReply reply = (AppendEntriesReply) in;
250 return reply.isSuccess();
257 assertEquals(false, out);
266 * This test verifies that when a new AppendEntries message is received with
267 * new entries and the logs of the sender and receiver match that the new
268 * entries get added to the log and the log is incremented by the number of
269 * entries received in appendEntries
274 public void testHandleAppendEntriesAddNewEntries() throws Exception {
275 new JavaTestKit(getSystem()) {{
277 MockRaftActorContext context = (MockRaftActorContext)
278 createActorContext();
280 // First set the receivers term to lower number
281 context.getTermInformation().update(1, "test");
283 // Prepare the receivers log
284 MockRaftActorContext.SimpleReplicatedLog log =
285 new MockRaftActorContext.SimpleReplicatedLog();
287 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
289 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
291 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
293 context.setReplicatedLog(log);
295 // Prepare the entries to be sent with AppendEntries
296 List<ReplicatedLogEntry> entries = new ArrayList<>();
298 new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
300 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
302 // Send appendEntries with the same term as was set on the receiver
303 // before the new behavior was created (1 in this case)
304 // This will not work for a Candidate because as soon as a Candidate
305 // is created it increments the term
306 AppendEntries appendEntries =
307 new AppendEntries(1, "leader-1", 2, 1, entries, 4);
309 RaftActorBehavior behavior = createBehavior(context);
311 // Send an unknown message so that the state of the RaftActor remains unchanged
312 RaftState expected = behavior.handleMessage(getRef(), "unknown");
314 RaftState raftState =
315 behavior.handleMessage(getRef(), appendEntries);
317 assertEquals(expected, raftState);
318 assertEquals(5, log.last().getIndex() + 1);
319 assertNotNull(log.get(3));
320 assertNotNull(log.get(4));
322 // Also expect an AppendEntriesReply to be sent where success is false
323 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
324 "AppendEntriesReply") {
325 // do not put code outside this method, will run afterwards
326 protected Boolean match(Object in) {
327 if (in instanceof AppendEntriesReply) {
328 AppendEntriesReply reply = (AppendEntriesReply) in;
329 return reply.isSuccess();
336 assertEquals(true, out);
345 * This test verifies that when a new AppendEntries message is received with
346 * new entries and the logs of the sender and receiver are out-of-sync that
347 * the log is first corrected by removing the out of sync entries from the
348 * log and then adding in the new entries sent with the AppendEntries message
353 public void testHandleAppendEntriesCorrectReceiverLogEntries()
355 new JavaTestKit(getSystem()) {{
357 MockRaftActorContext context = (MockRaftActorContext)
358 createActorContext();
360 // First set the receivers term to lower number
361 context.getTermInformation().update(2, "test");
363 // Prepare the receivers log
364 MockRaftActorContext.SimpleReplicatedLog log =
365 new MockRaftActorContext.SimpleReplicatedLog();
367 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
369 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
371 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
373 context.setReplicatedLog(log);
375 // Prepare the entries to be sent with AppendEntries
376 List<ReplicatedLogEntry> entries = new ArrayList<>();
378 new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
380 new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
382 // Send appendEntries with the same term as was set on the receiver
383 // before the new behavior was created (1 in this case)
384 // This will not work for a Candidate because as soon as a Candidate
385 // is created it increments the term
386 AppendEntries appendEntries =
387 new AppendEntries(2, "leader-1", 1, 1, entries, 3);
389 RaftActorBehavior behavior = createBehavior(context);
391 // Send an unknown message so that the state of the RaftActor remains unchanged
392 RaftState expected = behavior.handleMessage(getRef(), "unknown");
394 RaftState raftState =
395 behavior.handleMessage(getRef(), appendEntries);
397 assertEquals(expected, raftState);
399 // The entry at index 2 will be found out-of-sync with the leader
400 // and will be removed
401 // Then the two new entries will be added to the log
402 // Thus making the log to have 4 entries
403 assertEquals(4, log.last().getIndex() + 1);
404 assertNotNull(log.get(2));
406 assertEquals("one", log.get(1).getData().toString());
408 // Check that the entry at index 2 has the new data
409 assertEquals("two-1", log.get(2).getData().toString());
411 assertEquals("three", log.get(3).getData().toString());
413 assertNotNull(log.get(3));
415 // Also expect an AppendEntriesReply to be sent where success is false
416 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
417 "AppendEntriesReply") {
418 // do not put code outside this method, will run afterwards
419 protected Boolean match(Object in) {
420 if (in instanceof AppendEntriesReply) {
421 AppendEntriesReply reply = (AppendEntriesReply) in;
422 return reply.isSuccess();
429 assertEquals(true, out);
437 * This test verifies that when InstallSnapshot is received by
438 * the follower its applied correctly.
443 public void testHandleInstallSnapshot() throws Exception {
444 JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
446 ActorRef leaderActor = getSystem().actorOf(Props.create(
447 MessageCollectorActor.class));
449 MockRaftActorContext context = (MockRaftActorContext)
450 createActorContext(getRef());
452 Follower follower = (Follower)createBehavior(context);
454 HashMap<String, String> followerSnapshot = new HashMap<>();
455 followerSnapshot.put("1", "A");
456 followerSnapshot.put("2", "B");
457 followerSnapshot.put("3", "C");
459 ByteString bsSnapshot = toByteString(followerSnapshot);
460 ByteString chunkData = ByteString.EMPTY;
462 int snapshotLength = bsSnapshot.size();
466 chunkData = getNextChunk(bsSnapshot, offset);
467 final InstallSnapshot installSnapshot =
468 new InstallSnapshot(1, "leader-1", i, 1,
470 follower.handleMessage(leaderActor, installSnapshot);
471 offset = offset + 50;
473 } while ((offset+50) < snapshotLength);
475 final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
476 follower.handleMessage(leaderActor, installSnapshot3);
478 String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
480 protected String match(Object o) throws Exception {
481 if (o instanceof ApplySnapshot) {
482 ApplySnapshot as = (ApplySnapshot)o;
483 if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
484 return "applySnapshot-lastIndex-mismatch";
486 if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
487 return "applySnapshot-lastAppliedTerm-mismatch";
489 if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
490 return "applySnapshot-lastAppliedIndex-mismatch";
492 if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
493 return "applySnapshot-lastTerm-mismatch";
495 return "applySnapshot";
502 String applySnapshotMatch = "";
503 for (String reply: matches) {
504 if (reply.startsWith("applySnapshot")) {
505 applySnapshotMatch = reply;
509 assertEquals("applySnapshot", applySnapshotMatch);
511 Object messages = executeLocalOperation(leaderActor, "get-all-messages");
513 assertNotNull(messages);
514 assertTrue(messages instanceof List);
515 List<Object> listMessages = (List<Object>) messages;
517 int installSnapshotReplyReceivedCount = 0;
518 for (Object message: listMessages) {
519 if (message instanceof InstallSnapshotReply) {
520 ++installSnapshotReplyReceivedCount;
524 assertEquals(3, installSnapshotReplyReceivedCount);
529 public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
530 FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
531 Timeout operationTimeout = new Timeout(operationDuration);
532 Future<Object> future = ask(actor, message, operationTimeout);
535 return Await.result(future, operationDuration);
536 } catch (Exception e) {
541 public ByteString getNextChunk (ByteString bs, int offset){
542 int snapshotLength = bs.size();
545 if (50 > snapshotLength) {
546 size = snapshotLength;
548 if ((start + 50) > snapshotLength) {
549 size = snapshotLength - start;
552 return bs.substring(start, start + size);
555 private ByteString toByteString(Map<String, String> state) {
556 ByteArrayOutputStream b = null;
557 ObjectOutputStream o = null;
560 b = new ByteArrayOutputStream();
561 o = new ObjectOutputStream(b);
562 o.writeObject(state);
563 byte[] snapshotBytes = b.toByteArray();
564 return ByteString.copyFrom(snapshotBytes);
574 } catch (IOException e) {
575 org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);