2 * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
15 import akka.actor.Actor;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.testkit.TestActorRef;
19 import com.google.common.collect.ImmutableMap;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.notifications.RoleChanged;
24 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
25 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
26 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
28 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
29 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
30 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
31 import scala.concurrent.duration.FiniteDuration;
34 * Tests PreLeader raft state functionality end-to-end.
36 * @author Thomas Pantelis
38 public class PreLeaderScenarioTest extends AbstractRaftActorIntegrationTest {
40 private TestActorRef<Actor> follower1NotifierActor;
41 private DefaultConfigParamsImpl followerConfigParams;
44 public void testUnComittedEntryOnLeaderChange() throws Exception {
45 testLog.info("testUnComittedEntryOnLeaderChange starting");
49 // Drop AppendEntriesReply to the leader so it doesn't commit the payload entry.
50 leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
51 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
53 // Send a payload and verify AppendEntries is received in follower1.
54 MockPayload payload0 = sendPayloadData(leaderActor, "zero");
56 AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
57 assertEquals("AppendEntries - # entries", 1, appendEntries.getEntries().size());
58 verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 0, payload0);
60 // Kill the leader actor.
61 killActor(leaderActor);
63 // At this point, the payload entry is in follower1's log but is uncommitted. follower2 has not
64 // received the payload entry yet.
65 assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size());
66 assertEquals("Follower 1 journal last index", 0, follower1Context.getReplicatedLog().lastIndex());
67 assertEquals("Follower 1 commit index", -1, follower1Context.getCommitIndex());
68 assertEquals("Follower 1 last applied index", -1, follower1Context.getLastApplied());
70 assertEquals("Follower 2 journal log size", 0, follower2Context.getReplicatedLog().size());
72 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
73 clearMessages(follower1NotifierActor);
75 // Force follower1 to start an election. It should win since it's journal is more up-to-date than
76 // follower2's journal.
77 follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
79 // Verify the expected raft state changes. It should go to PreLeader since it has an uncommitted entry.
80 List<RoleChanged> roleChange = expectMatching(follower1NotifierActor, RoleChanged.class, 3);
81 assertEquals("Role change 1", RaftState.Candidate.name(), roleChange.get(0).getNewRole());
82 assertEquals("Role change 2", RaftState.PreLeader.name(), roleChange.get(1).getNewRole());
83 assertEquals("Role change 3", RaftState.Leader.name(), roleChange.get(2).getNewRole());
85 final long previousTerm = currentTerm;
86 currentTerm = follower1Context.getTermInformation().getCurrentTerm();
88 // Since it went to Leader, it should've appended and successfully replicated a NoopPaylod with the
89 // new term to follower2 and committed both entries, including the first payload from the previous term.
90 assertEquals("Follower 1 journal log size", 2, follower1Context.getReplicatedLog().size());
91 assertEquals("Follower 1 journal last index", 1, follower1Context.getReplicatedLog().lastIndex());
92 assertEquals("Follower 1 commit index", 1, follower1Context.getCommitIndex());
93 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
94 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
96 // Both entries should be applied to the state.
97 expectMatching(follower1CollectorActor, ApplyState.class, 2);
98 expectMatching(follower2CollectorActor, ApplyState.class, 2);
100 assertEquals("Follower 1 last applied index", 1, follower1Context.getLastApplied());
102 // Verify follower2's journal matches follower1's.
103 assertEquals("Follower 2 journal log size", 2, follower2Context.getReplicatedLog().size());
104 assertEquals("Follower 2 journal last index", 1, follower2Context.getReplicatedLog().lastIndex());
105 assertEquals("Follower 2 commit index", 1, follower2Context.getCommitIndex());
106 assertEquals("Follower 2 last applied index", 1, follower2Context.getLastApplied());
107 verifyReplicatedLogEntry(follower2Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
108 verifyReplicatedLogEntry(follower2Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
110 // Reinstate follower1.
111 killActor(follower1Actor);
113 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
114 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
115 .config(followerConfigParams));
116 follower1Actor.underlyingActor().waitForRecoveryComplete();
117 follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
119 // Verify follower1's journal was persisted and recovered correctly.
120 assertEquals("Follower 1 journal log size", 2, follower1Context.getReplicatedLog().size());
121 assertEquals("Follower 1 journal last index", 1, follower1Context.getReplicatedLog().lastIndex());
122 assertEquals("Follower 1 commit index", 1, follower1Context.getCommitIndex());
123 assertEquals("Follower 1 last applied index", 1, follower1Context.getLastApplied());
124 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
125 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
127 testLog.info("testUnComittedEntryOnLeaderChange ending");
130 private void createRaftActors() {
131 testLog.info("createRaftActors starting");
133 follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
134 factory.generateActorId(follower1Id + "-notifier"));
136 followerConfigParams = newFollowerConfigParams();
137 followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
138 followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
139 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
140 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
141 .config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
143 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
144 follower1Id, testActorPath(follower1Id)), followerConfigParams);
146 peerAddresses = ImmutableMap.<String, String>builder()
147 .put(follower1Id, follower1Actor.path().toString())
148 .put(follower2Id, follower2Actor.path().toString()).build();
150 leaderConfigParams = newLeaderConfigParams();
151 leaderConfigParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
152 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
154 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
155 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
156 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
158 leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
159 waitUntilLeader(leaderActor);
161 expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
162 expectFirstMatching(follower1CollectorActor, AppendEntries.class);
164 clearMessages(leaderCollectorActor);
165 clearMessages(follower1CollectorActor);
166 clearMessages(follower2CollectorActor);
168 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
169 currentTerm = leaderContext.getTermInformation().getCurrentTerm();
171 follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
172 follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
174 testLog.info("createRaftActors ending");