1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.testkit.JavaTestKit;
6 import com.google.protobuf.ByteString;
8 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
9 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
10 import org.opendaylight.controller.cluster.raft.RaftActorContext;
11 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
12 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
13 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
14 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
15 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
16 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
17 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
18 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
19 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
20 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
21 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
23 import java.io.ByteArrayOutputStream;
24 import java.io.IOException;
25 import java.io.ObjectOutputStream;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.HashMap;
29 import java.util.List;
32 import static org.junit.Assert.assertEquals;
33 import static org.junit.Assert.assertNotNull;
34 import static org.junit.Assert.assertTrue;
36 public class FollowerTest extends AbstractRaftActorBehaviorTest {
38 private final ActorRef followerActor = getSystem().actorOf(Props.create(
39 DoNothingActor.class));
42 @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
43 return new Follower(actorContext);
46 @Override protected RaftActorContext createActorContext() {
47 return createActorContext(followerActor);
50 protected RaftActorContext createActorContext(ActorRef actorRef){
51 return new MockRaftActorContext("test", getSystem(), actorRef);
55 public void testThatAnElectionTimeoutIsTriggered(){
56 new JavaTestKit(getSystem()) {{
58 new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
59 protected void run() {
61 Follower follower = new Follower(createActorContext(getTestActor()));
63 final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
64 // do not put code outside this method, will run afterwards
65 protected Boolean match(Object in) {
66 if (in instanceof ElectionTimeout) {
74 assertEquals(true, out);
81 public void testHandleElectionTimeout(){
82 RaftActorContext raftActorContext = createActorContext();
84 new Follower(raftActorContext);
86 RaftActorBehavior raftBehavior =
87 follower.handleMessage(followerActor, new ElectionTimeout());
89 assertTrue(raftBehavior instanceof Candidate);
93 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
94 new JavaTestKit(getSystem()) {{
96 new Within(duration("1 seconds")) {
97 protected void run() {
99 RaftActorContext context = createActorContext(getTestActor());
101 context.getTermInformation().update(1000, null);
103 RaftActorBehavior follower = createBehavior(context);
105 follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
107 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
108 // do not put code outside this method, will run afterwards
109 protected Boolean match(Object in) {
110 if (in instanceof RequestVoteReply) {
111 RequestVoteReply reply = (RequestVoteReply) in;
112 return reply.isVoteGranted();
119 assertEquals(true, out);
126 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
127 new JavaTestKit(getSystem()) {{
129 new Within(duration("1 seconds")) {
130 protected void run() {
132 RaftActorContext context = createActorContext(getTestActor());
134 context.getTermInformation().update(1000, "test");
136 RaftActorBehavior follower = createBehavior(context);
138 follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
140 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
141 // do not put code outside this method, will run afterwards
142 protected Boolean match(Object in) {
143 if (in instanceof RequestVoteReply) {
144 RequestVoteReply reply = (RequestVoteReply) in;
145 return reply.isVoteGranted();
152 assertEquals(false, out);
159 * This test verifies that when an AppendEntries RPC is received by a RaftActor
160 * with a commitIndex that is greater than what has been applied to the
161 * state machine of the RaftActor, the RaftActor applies the state and
162 * sets it current applied state to the commitIndex of the sender.
167 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
168 new JavaTestKit(getSystem()) {{
170 RaftActorContext context =
171 createActorContext();
173 context.setLastApplied(100);
174 setLastLogEntry((MockRaftActorContext) context, 1, 100,
175 new MockRaftActorContext.MockPayload(""));
176 ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
178 List<ReplicatedLogEntry> entries =
180 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
181 new MockRaftActorContext.MockPayload("foo"))
184 // The new commitIndex is 101
185 AppendEntries appendEntries =
186 new AppendEntries(2, "leader-1", 100, 1, entries, 101);
188 RaftActorBehavior raftBehavior =
189 createBehavior(context).handleMessage(getRef(), appendEntries);
191 assertEquals(101L, context.getLastApplied());
197 * This test verifies that when an AppendEntries is received a specific prevLogTerm
198 * which does not match the term that is in RaftActors log entry at prevLogIndex
199 * then the RaftActor does not change it's state and it returns a failure.
204 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
206 new JavaTestKit(getSystem()) {{
208 MockRaftActorContext context = (MockRaftActorContext)
209 createActorContext();
211 // First set the receivers term to lower number
212 context.getTermInformation().update(95, "test");
214 // Set the last log entry term for the receiver to be greater than
215 // what we will be sending as the prevLogTerm in AppendEntries
216 MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
217 setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
219 // AppendEntries is now sent with a bigger term
220 // this will set the receivers term to be the same as the sender's term
221 AppendEntries appendEntries =
222 new AppendEntries(100, "leader-1", 0, 0, null, 101);
224 RaftActorBehavior behavior = createBehavior(context);
226 // Send an unknown message so that the state of the RaftActor remains unchanged
227 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
229 RaftActorBehavior raftBehavior =
230 behavior.handleMessage(getRef(), appendEntries);
232 assertEquals(expected, raftBehavior);
234 // Also expect an AppendEntriesReply to be sent where success is false
235 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
236 "AppendEntriesReply") {
237 // do not put code outside this method, will run afterwards
238 protected Boolean match(Object in) {
239 if (in instanceof AppendEntriesReply) {
240 AppendEntriesReply reply = (AppendEntriesReply) in;
241 return reply.isSuccess();
248 assertEquals(false, out);
257 * This test verifies that when a new AppendEntries message is received with
258 * new entries and the logs of the sender and receiver match that the new
259 * entries get added to the log and the log is incremented by the number of
260 * entries received in appendEntries
265 public void testHandleAppendEntriesAddNewEntries() throws Exception {
266 new JavaTestKit(getSystem()) {{
268 MockRaftActorContext context = (MockRaftActorContext)
269 createActorContext();
271 // First set the receivers term to lower number
272 context.getTermInformation().update(1, "test");
274 // Prepare the receivers log
275 MockRaftActorContext.SimpleReplicatedLog log =
276 new MockRaftActorContext.SimpleReplicatedLog();
278 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
280 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
282 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
284 context.setReplicatedLog(log);
286 // Prepare the entries to be sent with AppendEntries
287 List<ReplicatedLogEntry> entries = new ArrayList<>();
289 new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
291 new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
293 // Send appendEntries with the same term as was set on the receiver
294 // before the new behavior was created (1 in this case)
295 // This will not work for a Candidate because as soon as a Candidate
296 // is created it increments the term
297 AppendEntries appendEntries =
298 new AppendEntries(1, "leader-1", 2, 1, entries, 4);
300 RaftActorBehavior behavior = createBehavior(context);
302 // Send an unknown message so that the state of the RaftActor remains unchanged
303 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
305 RaftActorBehavior raftBehavior =
306 behavior.handleMessage(getRef(), appendEntries);
308 assertEquals(expected, raftBehavior);
309 assertEquals(5, log.last().getIndex() + 1);
310 assertNotNull(log.get(3));
311 assertNotNull(log.get(4));
313 // Also expect an AppendEntriesReply to be sent where success is false
314 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
315 "AppendEntriesReply") {
316 // do not put code outside this method, will run afterwards
317 protected Boolean match(Object in) {
318 if (in instanceof AppendEntriesReply) {
319 AppendEntriesReply reply = (AppendEntriesReply) in;
320 return reply.isSuccess();
327 assertEquals(true, out);
336 * This test verifies that when a new AppendEntries message is received with
337 * new entries and the logs of the sender and receiver are out-of-sync that
338 * the log is first corrected by removing the out of sync entries from the
339 * log and then adding in the new entries sent with the AppendEntries message
344 public void testHandleAppendEntriesCorrectReceiverLogEntries()
346 new JavaTestKit(getSystem()) {{
348 MockRaftActorContext context = (MockRaftActorContext)
349 createActorContext();
351 // First set the receivers term to lower number
352 context.getTermInformation().update(2, "test");
354 // Prepare the receivers log
355 MockRaftActorContext.SimpleReplicatedLog log =
356 new MockRaftActorContext.SimpleReplicatedLog();
358 new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
360 new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
362 new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
364 context.setReplicatedLog(log);
366 // Prepare the entries to be sent with AppendEntries
367 List<ReplicatedLogEntry> entries = new ArrayList<>();
369 new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
371 new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
373 // Send appendEntries with the same term as was set on the receiver
374 // before the new behavior was created (1 in this case)
375 // This will not work for a Candidate because as soon as a Candidate
376 // is created it increments the term
377 AppendEntries appendEntries =
378 new AppendEntries(2, "leader-1", 1, 1, entries, 3);
380 RaftActorBehavior behavior = createBehavior(context);
382 // Send an unknown message so that the state of the RaftActor remains unchanged
383 RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
385 RaftActorBehavior raftBehavior =
386 behavior.handleMessage(getRef(), appendEntries);
388 assertEquals(expected, raftBehavior);
390 // The entry at index 2 will be found out-of-sync with the leader
391 // and will be removed
392 // Then the two new entries will be added to the log
393 // Thus making the log to have 4 entries
394 assertEquals(4, log.last().getIndex() + 1);
395 assertNotNull(log.get(2));
397 assertEquals("one", log.get(1).getData().toString());
399 // Check that the entry at index 2 has the new data
400 assertEquals("two-1", log.get(2).getData().toString());
402 assertEquals("three", log.get(3).getData().toString());
404 assertNotNull(log.get(3));
406 // Also expect an AppendEntriesReply to be sent where success is false
407 final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
408 "AppendEntriesReply") {
409 // do not put code outside this method, will run afterwards
410 protected Boolean match(Object in) {
411 if (in instanceof AppendEntriesReply) {
412 AppendEntriesReply reply = (AppendEntriesReply) in;
413 return reply.isSuccess();
420 assertEquals(true, out);
428 * This test verifies that when InstallSnapshot is received by
429 * the follower its applied correctly.
434 public void testHandleInstallSnapshot() throws Exception {
435 JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
437 ActorRef leaderActor = getSystem().actorOf(Props.create(
438 MessageCollectorActor.class));
440 MockRaftActorContext context = (MockRaftActorContext)
441 createActorContext(getRef());
443 Follower follower = (Follower)createBehavior(context);
445 HashMap<String, String> followerSnapshot = new HashMap<>();
446 followerSnapshot.put("1", "A");
447 followerSnapshot.put("2", "B");
448 followerSnapshot.put("3", "C");
450 ByteString bsSnapshot = toByteString(followerSnapshot);
451 ByteString chunkData = ByteString.EMPTY;
453 int snapshotLength = bsSnapshot.size();
457 chunkData = getNextChunk(bsSnapshot, offset);
458 final InstallSnapshot installSnapshot =
459 new InstallSnapshot(1, "leader-1", i, 1,
461 follower.handleMessage(leaderActor, installSnapshot);
462 offset = offset + 50;
464 } while ((offset+50) < snapshotLength);
466 final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
467 follower.handleMessage(leaderActor, installSnapshot3);
469 String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
471 protected String match(Object o) throws Exception {
472 if (o instanceof ApplySnapshot) {
473 ApplySnapshot as = (ApplySnapshot)o;
474 if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
475 return "applySnapshot-lastIndex-mismatch";
477 if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
478 return "applySnapshot-lastAppliedTerm-mismatch";
480 if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
481 return "applySnapshot-lastAppliedIndex-mismatch";
483 if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
484 return "applySnapshot-lastTerm-mismatch";
486 return "applySnapshot";
493 String applySnapshotMatch = "";
494 for (String reply: matches) {
495 if (reply.startsWith("applySnapshot")) {
496 applySnapshotMatch = reply;
500 assertEquals("applySnapshot", applySnapshotMatch);
502 Object messages = executeLocalOperation(leaderActor, "get-all-messages");
504 assertNotNull(messages);
505 assertTrue(messages instanceof List);
506 List<Object> listMessages = (List<Object>) messages;
508 int installSnapshotReplyReceivedCount = 0;
509 for (Object message: listMessages) {
510 if (message instanceof InstallSnapshotReply) {
511 ++installSnapshotReplyReceivedCount;
515 assertEquals(3, installSnapshotReplyReceivedCount);
520 public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
521 return MessageCollectorActor.getAllMessages(actor);
524 public ByteString getNextChunk (ByteString bs, int offset){
525 int snapshotLength = bs.size();
528 if (50 > snapshotLength) {
529 size = snapshotLength;
531 if ((start + 50) > snapshotLength) {
532 size = snapshotLength - start;
535 return bs.substring(start, start + size);
538 private ByteString toByteString(Map<String, String> state) {
539 ByteArrayOutputStream b = null;
540 ObjectOutputStream o = null;
543 b = new ByteArrayOutputStream();
544 o = new ObjectOutputStream(b);
545 o.writeObject(state);
546 byte[] snapshotBytes = b.toByteArray();
547 return ByteString.copyFrom(snapshotBytes);
557 } catch (IOException e) {
558 org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);