1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.testkit.JavaTestKit;
6 import com.google.protobuf.ByteString;
7 import junit.framework.Assert;
9 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
10 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
11 import org.opendaylight.controller.cluster.raft.RaftActorContext;
12 import org.opendaylight.controller.cluster.raft.RaftState;
13 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
14 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
15 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
16 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
17 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
18 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
19 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
20 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
21 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
22 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
23 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
25 import java.io.ByteArrayOutputStream;
26 import java.io.IOException;
27 import java.io.ObjectOutputStream;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.HashMap;
31 import java.util.List;
34 import static org.junit.Assert.assertEquals;
35 import static org.junit.Assert.assertNotNull;
36 import static org.junit.Assert.assertTrue;
38 public class FollowerTest extends AbstractRaftActorBehaviorTest {
40 private final ActorRef followerActor = getSystem().actorOf(Props.create(
41 DoNothingActor.class));
44 @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
45 return new Follower(actorContext);
48 @Override protected RaftActorContext createActorContext() {
49 return createActorContext(followerActor);
52 protected RaftActorContext createActorContext(ActorRef actorRef){
53 return new MockRaftActorContext("test", getSystem(), actorRef);
57 public void testThatAnElectionTimeoutIsTriggered(){
58 new JavaTestKit(getSystem()) {{
60 new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
61 protected void run() {
63 Follower follower = new Follower(createActorContext(getTestActor()));
65 final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
66 // do not put code outside this method, will run afterwards
67 protected Boolean match(Object in) {
68 if (in instanceof ElectionTimeout) {
76 assertEquals(true, out);
83 public void testHandleElectionTimeout(){
84 RaftActorContext raftActorContext = createActorContext();
86 new Follower(raftActorContext);
89 follower.handleMessage(followerActor, new ElectionTimeout());
91 Assert.assertEquals(RaftState.Candidate, raftState);
95 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
96 new JavaTestKit(getSystem()) {{
98 new Within(duration("1 seconds")) {
99 protected void run() {
101 RaftActorContext context = createActorContext(getTestActor());
103 context.getTermInformation().update(1000, null);
105 RaftActorBehavior follower = createBehavior(context);
107 follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
109 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
110 // do not put code outside this method, will run afterwards
111 protected Boolean match(Object in) {
112 if (in instanceof RequestVoteReply) {
113 RequestVoteReply reply = (RequestVoteReply) in;
114 return reply.isVoteGranted();
121 assertEquals(true, out);
128 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
129 new JavaTestKit(getSystem()) {{
131 new Within(duration("1 seconds")) {
132 protected void run() {
134 RaftActorContext context = createActorContext(getTestActor());
136 context.getTermInformation().update(1000, "test");
138 RaftActorBehavior follower = createBehavior(context);
140 follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
142 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
143 // do not put code outside this method, will run afterwards
144 protected Boolean match(Object in) {
145 if (in instanceof RequestVoteReply) {
146 RequestVoteReply reply = (RequestVoteReply) in;
147 return reply.isVoteGranted();
154 assertEquals(false, out);
161 * This test verifies that when an AppendEntries RPC is received by a RaftActor
162 * with a commitIndex that is greater than what has been applied to the
163 * state machine of the RaftActor, the RaftActor applies the state and
164 * sets it current applied state to the commitIndex of the sender.
169 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
170 new JavaTestKit(getSystem()) {{
172 RaftActorContext context =
173 createActorContext();
175 context.setLastApplied(100);
176 setLastLogEntry((MockRaftActorContext) context, 1, 100,
177 new MockRaftActorContext.MockPayload(""));
178 ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
180 List<ReplicatedLogEntry> entries =
182 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
183 new MockRaftActorContext.MockPayload("foo"))
186 // The new commitIndex is 101
187 AppendEntries appendEntries =
188 new AppendEntries(2, "leader-1", 100, 1, entries, 101);
190 RaftState raftState =
191 createBehavior(context).handleMessage(getRef(), appendEntries);
193 assertEquals(101L, context.getLastApplied());
199 * This test verifies that when an AppendEntries is received a specific prevLogTerm
200 * which does not match the term that is in RaftActors log entry at prevLogIndex
201 * then the RaftActor does not change it's state and it returns a failure.
206 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
208 new JavaTestKit(getSystem()) {{
210 MockRaftActorContext context = (MockRaftActorContext)
211 createActorContext();
213 // First set the receivers term to lower number
214 context.getTermInformation().update(95, "test");
216 // Set the last log entry term for the receiver to be greater than
217 // what we will be sending as the prevLogTerm in AppendEntries
218 MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
219 setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
221 // AppendEntries is now sent with a bigger term
222 // this will set the receivers term to be the same as the sender's term
223 AppendEntries appendEntries =
224 new AppendEntries(100, "leader-1", 0, 0, null, 101);
226 RaftActorBehavior behavior = createBehavior(context);
228 // Send an unknown message so that the state of the RaftActor remains unchanged
229 RaftState expected = behavior.handleMessage(getRef(), "unknown");
231 RaftState raftState =
232 behavior.handleMessage(getRef(), appendEntries);
234 assertEquals(expected, raftState);
236 // Also expect an AppendEntriesReply to be sent where success is false
237 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
238 "AppendEntriesReply") {
239 // do not put code outside this method, will run afterwards
240 protected Boolean match(Object in) {
241 if (in instanceof AppendEntriesReply) {
242 AppendEntriesReply reply = (AppendEntriesReply) in;
243 return reply.isSuccess();
250 assertEquals(false, out);
259 * This test verifies that when a new AppendEntries message is received with
260 * new entries and the logs of the sender and receiver match that the new
261 * entries get added to the log and the log is incremented by the number of
262 * entries received in appendEntries
267 public void testHandleAppendEntriesAddNewEntries() throws Exception {
268 new JavaTestKit(getSystem()) {{
270 MockRaftActorContext context = (MockRaftActorContext)
271 createActorContext();
273 // First set the receivers term to lower number
274 context.getTermInformation().update(1, "test");
276 // Prepare the receivers log
277 MockRaftActorContext.SimpleReplicatedLog log =
278 new MockRaftActorContext.SimpleReplicatedLog();
280 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
282 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
284 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
286 context.setReplicatedLog(log);
288 // Prepare the entries to be sent with AppendEntries
289 List<ReplicatedLogEntry> entries = new ArrayList<>();
291 new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
293 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
295 // Send appendEntries with the same term as was set on the receiver
296 // before the new behavior was created (1 in this case)
297 // This will not work for a Candidate because as soon as a Candidate
298 // is created it increments the term
299 AppendEntries appendEntries =
300 new AppendEntries(1, "leader-1", 2, 1, entries, 4);
302 RaftActorBehavior behavior = createBehavior(context);
304 // Send an unknown message so that the state of the RaftActor remains unchanged
305 RaftState expected = behavior.handleMessage(getRef(), "unknown");
307 RaftState raftState =
308 behavior.handleMessage(getRef(), appendEntries);
310 assertEquals(expected, raftState);
311 assertEquals(5, log.last().getIndex() + 1);
312 assertNotNull(log.get(3));
313 assertNotNull(log.get(4));
315 // Also expect an AppendEntriesReply to be sent where success is false
316 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
317 "AppendEntriesReply") {
318 // do not put code outside this method, will run afterwards
319 protected Boolean match(Object in) {
320 if (in instanceof AppendEntriesReply) {
321 AppendEntriesReply reply = (AppendEntriesReply) in;
322 return reply.isSuccess();
329 assertEquals(true, out);
338 * This test verifies that when a new AppendEntries message is received with
339 * new entries and the logs of the sender and receiver are out-of-sync that
340 * the log is first corrected by removing the out of sync entries from the
341 * log and then adding in the new entries sent with the AppendEntries message
346 public void testHandleAppendEntriesCorrectReceiverLogEntries()
348 new JavaTestKit(getSystem()) {{
350 MockRaftActorContext context = (MockRaftActorContext)
351 createActorContext();
353 // First set the receivers term to lower number
354 context.getTermInformation().update(2, "test");
356 // Prepare the receivers log
357 MockRaftActorContext.SimpleReplicatedLog log =
358 new MockRaftActorContext.SimpleReplicatedLog();
360 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
362 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
364 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
366 context.setReplicatedLog(log);
368 // Prepare the entries to be sent with AppendEntries
369 List<ReplicatedLogEntry> entries = new ArrayList<>();
371 new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
373 new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
375 // Send appendEntries with the same term as was set on the receiver
376 // before the new behavior was created (1 in this case)
377 // This will not work for a Candidate because as soon as a Candidate
378 // is created it increments the term
379 AppendEntries appendEntries =
380 new AppendEntries(2, "leader-1", 1, 1, entries, 3);
382 RaftActorBehavior behavior = createBehavior(context);
384 // Send an unknown message so that the state of the RaftActor remains unchanged
385 RaftState expected = behavior.handleMessage(getRef(), "unknown");
387 RaftState raftState =
388 behavior.handleMessage(getRef(), appendEntries);
390 assertEquals(expected, raftState);
392 // The entry at index 2 will be found out-of-sync with the leader
393 // and will be removed
394 // Then the two new entries will be added to the log
395 // Thus making the log to have 4 entries
396 assertEquals(4, log.last().getIndex() + 1);
397 assertNotNull(log.get(2));
399 assertEquals("one", log.get(1).getData().toString());
401 // Check that the entry at index 2 has the new data
402 assertEquals("two-1", log.get(2).getData().toString());
404 assertEquals("three", log.get(3).getData().toString());
406 assertNotNull(log.get(3));
408 // Also expect an AppendEntriesReply to be sent where success is false
409 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
410 "AppendEntriesReply") {
411 // do not put code outside this method, will run afterwards
412 protected Boolean match(Object in) {
413 if (in instanceof AppendEntriesReply) {
414 AppendEntriesReply reply = (AppendEntriesReply) in;
415 return reply.isSuccess();
422 assertEquals(true, out);
430 * This test verifies that when InstallSnapshot is received by
431 * the follower its applied correctly.
436 public void testHandleInstallSnapshot() throws Exception {
437 JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
439 ActorRef leaderActor = getSystem().actorOf(Props.create(
440 MessageCollectorActor.class));
442 MockRaftActorContext context = (MockRaftActorContext)
443 createActorContext(getRef());
445 Follower follower = (Follower)createBehavior(context);
447 HashMap<String, String> followerSnapshot = new HashMap<>();
448 followerSnapshot.put("1", "A");
449 followerSnapshot.put("2", "B");
450 followerSnapshot.put("3", "C");
452 ByteString bsSnapshot = toByteString(followerSnapshot);
453 ByteString chunkData = ByteString.EMPTY;
455 int snapshotLength = bsSnapshot.size();
459 chunkData = getNextChunk(bsSnapshot, offset);
460 final InstallSnapshot installSnapshot =
461 new InstallSnapshot(1, "leader-1", i, 1,
463 follower.handleMessage(leaderActor, installSnapshot);
464 offset = offset + 50;
466 } while ((offset+50) < snapshotLength);
468 final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
469 follower.handleMessage(leaderActor, installSnapshot3);
471 String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
473 protected String match(Object o) throws Exception {
474 if (o instanceof ApplySnapshot) {
475 ApplySnapshot as = (ApplySnapshot)o;
476 if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
477 return "applySnapshot-lastIndex-mismatch";
479 if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
480 return "applySnapshot-lastAppliedTerm-mismatch";
482 if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
483 return "applySnapshot-lastAppliedIndex-mismatch";
485 if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
486 return "applySnapshot-lastTerm-mismatch";
488 return "applySnapshot";
495 String applySnapshotMatch = "";
496 for (String reply: matches) {
497 if (reply.startsWith("applySnapshot")) {
498 applySnapshotMatch = reply;
502 assertEquals("applySnapshot", applySnapshotMatch);
504 Object messages = executeLocalOperation(leaderActor, "get-all-messages");
506 assertNotNull(messages);
507 assertTrue(messages instanceof List);
508 List<Object> listMessages = (List<Object>) messages;
510 int installSnapshotReplyReceivedCount = 0;
511 for (Object message: listMessages) {
512 if (message instanceof InstallSnapshotReply) {
513 ++installSnapshotReplyReceivedCount;
517 assertEquals(3, installSnapshotReplyReceivedCount);
522 public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
523 return MessageCollectorActor.getAllMessages(actor);
526 public ByteString getNextChunk (ByteString bs, int offset){
527 int snapshotLength = bs.size();
530 if (50 > snapshotLength) {
531 size = snapshotLength;
533 if ((start + 50) > snapshotLength) {
534 size = snapshotLength - start;
537 return bs.substring(start, start + size);
540 private ByteString toByteString(Map<String, String> state) {
541 ByteArrayOutputStream b = null;
542 ObjectOutputStream o = null;
545 b = new ByteArrayOutputStream();
546 o = new ObjectOutputStream(b);
547 o.writeObject(state);
548 byte[] snapshotBytes = b.toByteArray();
549 return ByteString.copyFrom(snapshotBytes);
559 } catch (IOException e) {
560 org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);