2 * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
14 import akka.actor.Actor;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.testkit.TestActorRef;
18 import com.google.common.collect.ImmutableMap;
19 import java.util.List;
20 import java.util.concurrent.TimeUnit;
21 import org.junit.Test;
22 import org.opendaylight.controller.cluster.notifications.RoleChanged;
23 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
24 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
25 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
29 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
30 import scala.concurrent.duration.FiniteDuration;
33 * Tests PreLeader raft state functionality end-to-end.
35 * @author Thomas Pantelis
37 public class PreLeaderScenarioTest extends AbstractRaftActorIntegrationTest {
39 private TestActorRef<Actor> follower1NotifierActor;
40 private DefaultConfigParamsImpl followerConfigParams;
43 public void testUnComittedEntryOnLeaderChange() throws Exception {
44 testLog.info("testUnComittedEntryOnLeaderChange starting");
48 // Drop AppendEntriesReply to the leader so it doesn't commit the payload entry.
49 leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
50 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
52 // Send a payload and verify AppendEntries is received in follower1.
53 MockPayload payload0 = sendPayloadData(leaderActor, "zero");
55 AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
56 assertEquals("AppendEntries - # entries", 1, appendEntries.getEntries().size());
57 verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 0, payload0);
59 // Kill the leader actor.
60 killActor(leaderActor);
62 // At this point, the payload entry is in follower1's log but is uncommitted. follower2 has not
63 // received the payload entry yet.
64 assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size());
65 assertEquals("Follower 1 journal last index", 0, follower1Context.getReplicatedLog().lastIndex());
66 assertEquals("Follower 1 commit index", -1, follower1Context.getCommitIndex());
67 assertEquals("Follower 1 last applied index", -1, follower1Context.getLastApplied());
69 assertEquals("Follower 2 journal log size", 0, follower2Context.getReplicatedLog().size());
71 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
72 clearMessages(follower1NotifierActor);
74 // Force follower1 to start an election. It should win since it's journal is more up-to-date than
75 // follower2's journal.
76 follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
78 // Verify the expected raft state changes. It should go to PreLeader since it has an uncommitted entry.
79 List<RoleChanged> roleChange = expectMatching(follower1NotifierActor, RoleChanged.class, 3);
80 assertEquals("Role change 1", RaftState.Candidate.name(), roleChange.get(0).getNewRole());
81 assertEquals("Role change 2", RaftState.PreLeader.name(), roleChange.get(1).getNewRole());
82 assertEquals("Role change 3", RaftState.Leader.name(), roleChange.get(2).getNewRole());
84 long previousTerm = currentTerm;
85 currentTerm = follower1Context.getTermInformation().getCurrentTerm();
87 // Since it went to Leader, it should've appended and successfully replicated a NoopPaylod with the
88 // new term to follower2 and committed both entries, including the first payload from the previous term.
89 assertEquals("Follower 1 journal log size", 2, follower1Context.getReplicatedLog().size());
90 assertEquals("Follower 1 journal last index", 1, follower1Context.getReplicatedLog().lastIndex());
91 assertEquals("Follower 1 commit index", 1, follower1Context.getCommitIndex());
92 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
93 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
95 // Both entries should be applied to the state.
96 expectMatching(follower1CollectorActor, ApplyState.class, 2);
97 expectMatching(follower2CollectorActor, ApplyState.class, 2);
99 assertEquals("Follower 1 last applied index", 1, follower1Context.getLastApplied());
101 // Verify follower2's journal matches follower1's.
102 assertEquals("Follower 2 journal log size", 2, follower2Context.getReplicatedLog().size());
103 assertEquals("Follower 2 journal last index", 1, follower2Context.getReplicatedLog().lastIndex());
104 assertEquals("Follower 2 commit index", 1, follower2Context.getCommitIndex());
105 assertEquals("Follower 2 last applied index", 1, follower2Context.getLastApplied());
106 verifyReplicatedLogEntry(follower2Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
107 verifyReplicatedLogEntry(follower2Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
109 // Reinstate follower1.
110 killActor(follower1Actor);
112 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
113 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
114 config(followerConfigParams));
115 follower1Actor.underlyingActor().waitForRecoveryComplete();
116 follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
118 // Verify follower1's journal was persisted and recovered correctly.
119 assertEquals("Follower 1 journal log size", 2, follower1Context.getReplicatedLog().size());
120 assertEquals("Follower 1 journal last index", 1, follower1Context.getReplicatedLog().lastIndex());
121 assertEquals("Follower 1 commit index", 1, follower1Context.getCommitIndex());
122 assertEquals("Follower 1 last applied index", 1, follower1Context.getLastApplied());
123 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
124 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
126 testLog.info("testUnComittedEntryOnLeaderChange ending");
129 private void createRaftActors() {
130 testLog.info("createRaftActors starting");
132 follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
133 factory.generateActorId(follower1Id + "-notifier"));
135 followerConfigParams = newFollowerConfigParams();
136 followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
137 followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
138 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
139 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
140 config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
142 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
143 follower1Id, testActorPath(follower1Id)), followerConfigParams);
145 peerAddresses = ImmutableMap.<String, String>builder().
146 put(follower1Id, follower1Actor.path().toString()).
147 put(follower2Id, follower2Actor.path().toString()).build();
149 leaderConfigParams = newLeaderConfigParams();
150 leaderConfigParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
151 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
153 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
154 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
155 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
157 leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
158 waitUntilLeader(leaderActor);
160 expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
162 clearMessages(leaderCollectorActor);
163 clearMessages(follower1CollectorActor);
164 clearMessages(follower2CollectorActor);
166 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
167 currentTerm = leaderContext.getTermInformation().getCurrentTerm();
169 follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
170 follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
172 testLog.info("createRaftActors ending");