2 * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
15 import akka.actor.ActorRef;
16 import com.google.common.collect.ImmutableMap;
17 import java.util.List;
18 import java.util.concurrent.TimeUnit;
19 import org.junit.Test;
20 import org.opendaylight.controller.cluster.notifications.RoleChanged;
21 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
22 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
23 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
26 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
27 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
28 import scala.concurrent.duration.FiniteDuration;
31 * Tests PreLeader raft state functionality end-to-end.
33 * @author Thomas Pantelis
35 public class PreLeaderScenarioTest extends AbstractRaftActorIntegrationTest {
37 private ActorRef follower1NotifierActor;
38 private DefaultConfigParamsImpl followerConfigParams;
41 public void testUnComittedEntryOnLeaderChange() {
42 testLog.info("testUnComittedEntryOnLeaderChange starting");
46 // Drop AppendEntriesReply to the leader so it doesn't commit the payload entry.
47 leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
48 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
50 // Send a payload and verify AppendEntries is received in follower1.
51 MockPayload payload0 = sendPayloadData(leaderActor, "zero");
53 AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
54 assertEquals("AppendEntries - # entries", 1, appendEntries.getEntries().size());
55 verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 0, payload0);
57 // Kill the leader actor.
58 killActor(leaderActor);
60 // At this point, the payload entry is in follower1's log but is uncommitted. follower2 has not
61 // received the payload entry yet.
62 assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size());
63 assertEquals("Follower 1 journal last index", 0, follower1Context.getReplicatedLog().lastIndex());
64 assertEquals("Follower 1 commit index", -1, follower1Context.getCommitIndex());
65 assertEquals("Follower 1 last applied index", -1, follower1Context.getLastApplied());
67 assertEquals("Follower 2 journal log size", 0, follower2Context.getReplicatedLog().size());
69 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
70 clearMessages(follower1NotifierActor);
72 // Force follower1 to start an election. It should win since it's journal is more up-to-date than
73 // follower2's journal.
74 follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
76 // Verify the expected raft state changes. It should go to PreLeader since it has an uncommitted entry.
77 List<RoleChanged> roleChange = expectMatching(follower1NotifierActor, RoleChanged.class, 3);
78 assertEquals("Role change 1", RaftState.Candidate.name(), roleChange.get(0).getNewRole());
79 assertEquals("Role change 2", RaftState.PreLeader.name(), roleChange.get(1).getNewRole());
80 assertEquals("Role change 3", RaftState.Leader.name(), roleChange.get(2).getNewRole());
82 final long previousTerm = currentTerm;
83 currentTerm = follower1Context.getTermInformation().getCurrentTerm();
85 // Since it went to Leader, it should've appended and successfully replicated a NoopPaylod with the
86 // new term to follower2 and committed both entries, including the first payload from the previous term.
87 assertEquals("Follower 1 journal log size", 2, follower1Context.getReplicatedLog().size());
88 assertEquals("Follower 1 journal last index", 1, follower1Context.getReplicatedLog().lastIndex());
89 assertEquals("Follower 1 commit index", 1, follower1Context.getCommitIndex());
90 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
91 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
93 // Both entries should be applied to the state.
94 expectMatching(follower1CollectorActor, ApplyState.class, 2);
95 expectMatching(follower2CollectorActor, ApplyState.class, 2);
97 assertEquals("Follower 1 last applied index", 1, follower1Context.getLastApplied());
99 // Verify follower2's journal matches follower1's.
100 assertEquals("Follower 2 journal log size", 2, follower2Context.getReplicatedLog().size());
101 assertEquals("Follower 2 journal last index", 1, follower2Context.getReplicatedLog().lastIndex());
102 assertEquals("Follower 2 commit index", 1, follower2Context.getCommitIndex());
103 assertEquals("Follower 2 last applied index", 1, follower2Context.getLastApplied());
104 verifyReplicatedLogEntry(follower2Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
105 verifyReplicatedLogEntry(follower2Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
107 // Reinstate follower1.
108 killActor(follower1Actor);
110 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
111 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
112 .config(followerConfigParams));
113 follower1Actor.underlyingActor().waitForRecoveryComplete();
114 follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
116 // Verify follower1's journal was persisted and recovered correctly.
117 assertEquals("Follower 1 journal log size", 2, follower1Context.getReplicatedLog().size());
118 assertEquals("Follower 1 journal last index", 1, follower1Context.getReplicatedLog().lastIndex());
119 assertEquals("Follower 1 commit index", 1, follower1Context.getCommitIndex());
120 assertEquals("Follower 1 last applied index", 1, follower1Context.getLastApplied());
121 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
122 verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
124 testLog.info("testUnComittedEntryOnLeaderChange ending");
127 private void createRaftActors() {
128 testLog.info("createRaftActors starting");
130 follower1NotifierActor = factory.createActor(MessageCollectorActor.props(),
131 factory.generateActorId(follower1Id + "-notifier"));
133 followerConfigParams = newFollowerConfigParams();
134 followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
135 followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
136 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
137 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
138 .config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
140 follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
141 follower1Id, testActorPath(follower1Id)), followerConfigParams);
143 peerAddresses = ImmutableMap.<String, String>builder()
144 .put(follower1Id, follower1Actor.path().toString())
145 .put(follower2Id, follower2Actor.path().toString()).build();
147 leaderConfigParams = newLeaderConfigParams();
148 leaderConfigParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
149 leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
151 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
152 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
153 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
155 leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
156 waitUntilLeader(leaderActor);
158 expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
159 expectFirstMatching(follower1CollectorActor, AppendEntries.class);
161 clearMessages(leaderCollectorActor);
162 clearMessages(follower1CollectorActor);
163 clearMessages(follower2CollectorActor);
165 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
166 currentTerm = leaderContext.getTermInformation().getCurrentTerm();
168 follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
169 follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
171 testLog.info("createRaftActors ending - follower1: {}, follower2: {}", follower1Id, follower2Id);