1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.testkit.JavaTestKit;
7 import com.google.protobuf.ByteString;
8 import org.junit.Assert;
10 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
11 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
12 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
13 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftActorContext;
15 import org.opendaylight.controller.cluster.raft.RaftState;
16 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
17 import org.opendaylight.controller.cluster.raft.SerializationUtils;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
20 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
21 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
22 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
23 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
24 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
25 import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
26 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
28 import java.io.ByteArrayOutputStream;
29 import java.io.IOException;
30 import java.io.ObjectOutputStream;
31 import java.util.HashMap;
33 import java.util.concurrent.atomic.AtomicLong;
35 import static org.junit.Assert.assertEquals;
36 import static org.junit.Assert.assertNotNull;
37 import static org.junit.Assert.assertTrue;
39 public class LeaderTest extends AbstractRaftActorBehaviorTest {
41 private ActorRef leaderActor =
42 getSystem().actorOf(Props.create(DoNothingActor.class));
43 private ActorRef senderActor =
44 getSystem().actorOf(Props.create(DoNothingActor.class));
47 public void testHandleMessageForUnknownMessage() throws Exception {
48 new JavaTestKit(getSystem()) {{
50 new Leader(createActorContext());
52 // handle message should return the Leader state when it receives an
54 RaftState state = leader.handleMessage(senderActor, "foo");
55 Assert.assertEquals(RaftState.Leader, state);
61 public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
62 new JavaTestKit(getSystem()) {{
64 new Within(duration("1 seconds")) {
65 protected void run() {
67 ActorRef followerActor = getTestActor();
69 MockRaftActorContext actorContext =
70 (MockRaftActorContext) createActorContext();
72 Map<String, String> peerAddresses = new HashMap();
74 peerAddresses.put(followerActor.path().toString(),
75 followerActor.path().toString());
77 actorContext.setPeerAddresses(peerAddresses);
79 Leader leader = new Leader(actorContext);
80 leader.handleMessage(senderActor, new SendHeartBeat());
83 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
84 // do not put code outside this method, will run afterwards
85 protected String match(Object in) {
86 Object msg = fromSerializableMessage(in);
87 if (msg instanceof AppendEntries) {
88 if (((AppendEntries)msg).getTerm() == 0) {
96 }.get(); // this extracts the received message
98 assertEquals("match", out);
106 public void testHandleReplicateMessageSendAppendEntriesToFollower() {
107 new JavaTestKit(getSystem()) {{
109 new Within(duration("1 seconds")) {
110 protected void run() {
112 ActorRef followerActor = getTestActor();
114 MockRaftActorContext actorContext =
115 (MockRaftActorContext) createActorContext();
117 Map<String, String> peerAddresses = new HashMap();
119 peerAddresses.put(followerActor.path().toString(),
120 followerActor.path().toString());
122 actorContext.setPeerAddresses(peerAddresses);
124 Leader leader = new Leader(actorContext);
125 RaftState raftState = leader
126 .handleMessage(senderActor, new Replicate(null, null,
127 new MockRaftActorContext.MockReplicatedLogEntry(1,
129 new MockRaftActorContext.MockPayload("foo"))
132 // State should not change
133 assertEquals(RaftState.Leader, raftState);
136 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
137 // do not put code outside this method, will run afterwards
138 protected String match(Object in) {
139 Object msg = fromSerializableMessage(in);
140 if (msg instanceof AppendEntries) {
141 if (((AppendEntries)msg).getTerm() == 0) {
149 }.get(); // this extracts the received message
151 assertEquals("match", out);
161 public void testHandleReplicateMessageWhenThereAreNoFollowers() {
162 new JavaTestKit(getSystem()) {{
164 new Within(duration("1 seconds")) {
165 protected void run() {
167 ActorRef raftActor = getTestActor();
169 MockRaftActorContext actorContext =
170 new MockRaftActorContext("test", getSystem(), raftActor);
172 actorContext.getReplicatedLog().removeFrom(0);
174 actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(0, 1,
175 new MockRaftActorContext.MockPayload("foo")));
177 ReplicatedLogImplEntry entry =
178 new ReplicatedLogImplEntry(1, 1,
179 new MockRaftActorContext.MockPayload("foo"));
181 actorContext.getReplicatedLog().append(entry);
183 Leader leader = new Leader(actorContext);
184 RaftState raftState = leader
185 .handleMessage(senderActor, new Replicate(null, "state-id",entry));
187 // State should not change
188 assertEquals(RaftState.Leader, raftState);
190 assertEquals(1, actorContext.getCommitIndex());
193 new ExpectMsg<String>(duration("1 seconds"),
195 // do not put code outside this method, will run afterwards
196 protected String match(Object in) {
197 if (in instanceof ApplyState) {
198 if (((ApplyState) in).getIdentifier().equals("state-id")) {
206 }.get(); // this extracts the received message
208 assertEquals("match", out);
216 public void testSendInstallSnapshot() {
217 new LeaderTestKit(getSystem()) {{
219 new Within(duration("1 seconds")) {
220 protected void run() {
221 ActorRef followerActor = getTestActor();
223 Map<String, String> peerAddresses = new HashMap();
224 peerAddresses.put(followerActor.path().toString(),
225 followerActor.path().toString());
228 MockRaftActorContext actorContext =
229 (MockRaftActorContext) createActorContext(getRef());
230 actorContext.setPeerAddresses(peerAddresses);
233 Map<String, String> leadersSnapshot = new HashMap<>();
234 leadersSnapshot.put("1", "A");
235 leadersSnapshot.put("2", "B");
236 leadersSnapshot.put("3", "C");
239 actorContext.getReplicatedLog().removeFrom(0);
241 final int followersLastIndex = 2;
242 final int snapshotIndex = 3;
243 final int newEntryIndex = 4;
244 final int snapshotTerm = 1;
245 final int currentTerm = 2;
247 // set the snapshot variables in replicatedlog
248 actorContext.getReplicatedLog().setSnapshot(
249 toByteString(leadersSnapshot));
250 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
251 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
253 MockLeader leader = new MockLeader(actorContext);
254 // set the follower info in leader
255 leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
258 ReplicatedLogImplEntry entry =
259 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
260 new MockRaftActorContext.MockPayload("D"));
262 // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
263 RaftState raftState = leader.handleMessage(
264 senderActor, new Replicate(null, "state-id", entry));
266 assertEquals(RaftState.Leader, raftState);
268 // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
269 Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
271 protected Boolean match(Object o) throws Exception {
272 if (o instanceof SendInstallSnapshot) {
279 boolean sendInstallSnapshotReceived = false;
280 for (Boolean b: matches) {
281 sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
284 assertTrue(sendInstallSnapshotReceived);
292 public void testInstallSnapshot() {
293 new LeaderTestKit(getSystem()) {{
295 new Within(duration("1 seconds")) {
296 protected void run() {
297 ActorRef followerActor = getTestActor();
299 Map<String, String> peerAddresses = new HashMap();
300 peerAddresses.put(followerActor.path().toString(),
301 followerActor.path().toString());
303 MockRaftActorContext actorContext =
304 (MockRaftActorContext) createActorContext();
305 actorContext.setPeerAddresses(peerAddresses);
308 Map<String, String> leadersSnapshot = new HashMap<>();
309 leadersSnapshot.put("1", "A");
310 leadersSnapshot.put("2", "B");
311 leadersSnapshot.put("3", "C");
314 actorContext.getReplicatedLog().removeFrom(0);
316 final int followersLastIndex = 2;
317 final int snapshotIndex = 3;
318 final int newEntryIndex = 4;
319 final int snapshotTerm = 1;
320 final int currentTerm = 2;
322 // set the snapshot variables in replicatedlog
323 actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
324 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
325 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
327 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
329 MockLeader leader = new MockLeader(actorContext);
330 // set the follower info in leader
331 leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
334 ReplicatedLogImplEntry entry =
335 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
336 new MockRaftActorContext.MockPayload("D"));
339 RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
341 assertEquals(RaftState.Leader, raftState);
343 // check if installsnapshot gets called with the correct values.
345 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
346 // do not put code outside this method, will run afterwards
347 protected String match(Object in) {
348 if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
349 InstallSnapshot is = (InstallSnapshot)
350 SerializationUtils.fromSerializable(in);
351 if (is.getData() == null) {
352 return "InstallSnapshot data is null";
354 if (is.getLastIncludedIndex() != snapshotIndex) {
355 return is.getLastIncludedIndex() + "!=" + snapshotIndex;
357 if (is.getLastIncludedTerm() != snapshotTerm) {
358 return is.getLastIncludedTerm() + "!=" + snapshotTerm;
360 if (is.getTerm() == currentTerm) {
361 return is.getTerm() + "!=" + currentTerm;
367 return "message mismatch:" + in.getClass();
370 }.get(); // this extracts the received message
372 assertEquals("match", out);
379 public void testHandleInstallSnapshotReplyLastChunk() {
380 new LeaderTestKit(getSystem()) {{
381 new Within(duration("1 seconds")) {
382 protected void run() {
383 ActorRef followerActor = getTestActor();
385 Map<String, String> peerAddresses = new HashMap();
386 peerAddresses.put(followerActor.path().toString(),
387 followerActor.path().toString());
389 MockRaftActorContext actorContext =
390 (MockRaftActorContext) createActorContext();
391 actorContext.setPeerAddresses(peerAddresses);
393 final int followersLastIndex = 2;
394 final int snapshotIndex = 3;
395 final int newEntryIndex = 4;
396 final int snapshotTerm = 1;
397 final int currentTerm = 2;
399 MockLeader leader = new MockLeader(actorContext);
400 // set the follower info in leader
401 leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
403 Map<String, String> leadersSnapshot = new HashMap<>();
404 leadersSnapshot.put("1", "A");
405 leadersSnapshot.put("2", "B");
406 leadersSnapshot.put("3", "C");
408 // set the snapshot variables in replicatedlog
409 actorContext.getReplicatedLog().setSnapshot(
410 toByteString(leadersSnapshot));
411 actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
412 actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
413 actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
415 ByteString bs = toByteString(leadersSnapshot);
416 leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
417 while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
418 leader.getFollowerToSnapshot().getNextChunk();
419 leader.getFollowerToSnapshot().incrementChunkIndex();
423 actorContext.getReplicatedLog().removeFrom(0);
425 RaftState raftState = leader.handleMessage(senderActor,
426 new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
427 leader.getFollowerToSnapshot().getChunkIndex(), true));
429 assertEquals(RaftState.Leader, raftState);
431 assertEquals(leader.mapFollowerToSnapshot.size(), 0);
432 assertEquals(leader.followerToLog.size(), 1);
433 assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
434 FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
435 assertEquals(snapshotIndex, fli.getMatchIndex().get());
436 assertEquals(snapshotIndex, fli.getMatchIndex().get());
437 assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
444 public void testFollowerToSnapshotLogic() {
446 MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
448 actorContext.setConfigParams(new DefaultConfigParamsImpl() {
450 public int getSnapshotChunkSize() {
455 MockLeader leader = new MockLeader(actorContext);
457 Map<String, String> leadersSnapshot = new HashMap<>();
458 leadersSnapshot.put("1", "A");
459 leadersSnapshot.put("2", "B");
460 leadersSnapshot.put("3", "C");
462 ByteString bs = toByteString(leadersSnapshot);
463 byte[] barray = bs.toByteArray();
465 leader.createFollowerToSnapshot("followerId", bs);
466 assertEquals(bs.size(), barray.length);
469 for (int i=0; i < barray.length; i = i + 50) {
473 if (i + 50 > barray.length) {
477 ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
478 assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
479 assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
481 leader.getFollowerToSnapshot().markSendStatus(true);
482 if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
483 leader.getFollowerToSnapshot().incrementChunkIndex();
487 assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
491 @Override protected RaftActorBehavior createBehavior(
492 RaftActorContext actorContext) {
493 return new Leader(actorContext);
496 @Override protected RaftActorContext createActorContext() {
497 return createActorContext(leaderActor);
500 protected RaftActorContext createActorContext(ActorRef actorRef) {
501 return new MockRaftActorContext("test", getSystem(), actorRef);
504 private ByteString toByteString(Map<String, String> state) {
505 ByteArrayOutputStream b = null;
506 ObjectOutputStream o = null;
509 b = new ByteArrayOutputStream();
510 o = new ObjectOutputStream(b);
511 o.writeObject(state);
512 byte[] snapshotBytes = b.toByteArray();
513 return ByteString.copyFrom(snapshotBytes);
523 } catch (IOException e) {
524 Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
529 private static class LeaderTestKit extends JavaTestKit {
531 private LeaderTestKit(ActorSystem actorSystem) {
535 protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
536 // Wait for a specific log message to show up
537 final boolean result =
538 new JavaTestKit.EventFilter<Boolean>(logLevel
541 protected Boolean run() {
544 }.from(subject.path().toString())
546 .occurrences(1).exec();
548 Assert.assertEquals(true, result);
553 class MockLeader extends Leader {
555 FollowerToSnapshot fts;
557 public MockLeader(RaftActorContext context){
561 public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
562 FollowerLogInformation followerLogInformation =
563 new FollowerLogInformationImpl(followerId,
564 new AtomicLong(nextIndex),
565 new AtomicLong(matchIndex));
566 followerToLog.put(followerId, followerLogInformation);
569 public FollowerToSnapshot getFollowerToSnapshot() {
573 public void createFollowerToSnapshot(String followerId, ByteString bs ) {
574 fts = new FollowerToSnapshot(bs);
575 mapFollowerToSnapshot.put(followerId, fts);